Skip to content

Instantly share code, notes, and snippets.

@nickodell
Created January 21, 2023 23:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickodell/5b99512c000ee5f8ce75404c047b017a to your computer and use it in GitHub Desktop.
Save nickodell/5b99512c000ee5f8ce75404c047b017a to your computer and use it in GitHub Desktop.
import pandas as pd
import subprocess
import tempfile
import time
db_server = 'andre'
def load(table_name, connection):
return read_sql_tmpfile(f'SELECT * FROM {table_name}', connection)
def read_sql_tmpfile(query, connection):
start = time.perf_counter()
# Create unique temp directory on server side
# Use set -e to avoid masking error codes
cmd = 'set -e; f="$(mktemp -d)"; setfacl -m mysql:wx "$f"; echo "$f"'
out_mktemp = subprocess.check_output(['ssh', db_server, cmd])
if not out_mktemp:
raise Exception("Can't make tempfile")
# Remove additional white spaces around the output
tmp_dir = out_mktemp.decode('ascii').strip()
# The following command should be made superfluous by tweaking the group memberships
# to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command
end = time.perf_counter()
print(f'Duration init: {end - start:.3f}')
start = time.perf_counter()
try:
remote_tmp_file = f'{tmp_dir}/sql_tmpfile'
# Get headers from query
headers = pd.read_sql(f'{query} limit 0', con=connection)
end = time.perf_counter()
print(f'Duration headers: {end - start:.3f}')
start = time.perf_counter()
# remember: db-connection's user need `FILE` privilege
# think about sql injection, pass MySQL parameters in query and corresponding parameters list to this function if appropriate
copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'"
connection.execute(copy_sql)
end = time.perf_counter()
print(f'Duration export: {end - start:.3f}')
start = time.perf_counter()
# Create local temporary file
with tempfile.NamedTemporaryFile(mode='rb') as local_temp:
# Copy remote file to local path
subprocess.check_call(['sftp', f'{db_server}:{remote_tmp_file}', local_temp.name])
end = time.perf_counter()
print(f'Duration copy: {end - start:.3f}')
start = time.perf_counter()
df = pd.read_csv(local_temp, delimiter='\t', header=None, names=headers.columns)
end = time.perf_counter()
print(f'Duration read: {end - start:.3f}')
start = time.perf_counter()
finally:
# cleanup remote temp dir
subprocess.check_call(['ssh', db_server, f'rm -r {tmp_dir}'])
end = time.perf_counter()
print(f'Duration cleanup: {end - start:.3f}')
return df
pd.read_sql() timings:
Duration total: 11.356
sftp timings:
Duration init: 0.858
Duration headers: 0.008
Duration export: 4.608
Duration copy: 1.966
Duration read: 0.863
Duration cleanup: 0.896
Duration total: 9.199
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment