-
-
Save nickodell/5b99512c000ee5f8ce75404c047b017a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import subprocess | |
import tempfile | |
import time | |
db_server = 'andre' | |
def load(table_name, connection): | |
return read_sql_tmpfile(f'SELECT * FROM {table_name}', connection) | |
def read_sql_tmpfile(query, connection): | |
start = time.perf_counter() | |
# Create unique temp directory on server side | |
# Use set -e to avoid masking error codes | |
cmd = 'set -e; f="$(mktemp -d)"; setfacl -m mysql:wx "$f"; echo "$f"' | |
out_mktemp = subprocess.check_output(['ssh', db_server, cmd]) | |
if not out_mktemp: | |
raise Exception("Can't make tempfile") | |
# Remove additional white spaces around the output | |
tmp_dir = out_mktemp.decode('ascii').strip() | |
# The following command should be made superfluous by tweaking the group memberships | |
# to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command | |
end = time.perf_counter() | |
print(f'Duration init: {end - start:.3f}') | |
start = time.perf_counter() | |
try: | |
remote_tmp_file = f'{tmp_dir}/sql_tmpfile' | |
# Get headers from query | |
headers = pd.read_sql(f'{query} limit 0', con=connection) | |
end = time.perf_counter() | |
print(f'Duration headers: {end - start:.3f}') | |
start = time.perf_counter() | |
# remember: db-connection's user need `FILE` privilege | |
# think about sql injection, pass MySQL parameters in query and corresponding parameters list to this function if appropriate | |
copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'" | |
connection.execute(copy_sql) | |
end = time.perf_counter() | |
print(f'Duration export: {end - start:.3f}') | |
start = time.perf_counter() | |
# Create local temporary file | |
with tempfile.NamedTemporaryFile(mode='rb') as local_temp: | |
# Copy remote file to local path | |
subprocess.check_call(['sftp', f'{db_server}:{remote_tmp_file}', local_temp.name]) | |
end = time.perf_counter() | |
print(f'Duration copy: {end - start:.3f}') | |
start = time.perf_counter() | |
df = pd.read_csv(local_temp, delimiter='\t', header=None, names=headers.columns) | |
end = time.perf_counter() | |
print(f'Duration read: {end - start:.3f}') | |
start = time.perf_counter() | |
finally: | |
# cleanup remote temp dir | |
subprocess.check_call(['ssh', db_server, f'rm -r {tmp_dir}']) | |
end = time.perf_counter() | |
print(f'Duration cleanup: {end - start:.3f}') | |
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pd.read_sql() timings: | |
Duration total: 11.356 | |
sftp timings: | |
Duration init: 0.858 | |
Duration headers: 0.008 | |
Duration export: 4.608 | |
Duration copy: 1.966 | |
Duration read: 0.863 | |
Duration cleanup: 0.896 | |
Duration total: 9.199 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment