Skip to content

Instantly share code, notes, and snippets.

@rvaidya
Last active June 27, 2024 08:01
Show Gist options
  • Save rvaidya/c78f34b3c29ff087d8700297c0ca3b5b to your computer and use it in GitHub Desktop.
Save rvaidya/c78f34b3c29ff087d8700297c0ca3b5b to your computer and use it in GitHub Desktop.
Dump database table to parquet file using sqlalchemy and fastparquet. Useful for loading large tables into pandas / Dask, since read_sql_table will hammer the server with queries if the # of partitions/chunks is high. Using this you write a temp parquet file, then use read_parquet to get the data into a DataFrame
import pandas as pd
import numpy as np
import fastparquet
from sqlalchemy import create_engine, schema, Table
# Copied from pandas with modifications
def __get_dtype(column, sqltype):
import sqlalchemy.dialects as sqld
from sqlalchemy.types import (Integer, Float, Boolean, DateTime,
Date, TIMESTAMP)
if isinstance(sqltype, Float):
return float
elif isinstance(sqltype, Integer):
# Since DataFrame cannot handle nullable int, convert nullable ints to floats
if column.nullable:
return float
# TODO: Refine integer size.
return np.dtype('int64')
elif isinstance(sqltype, TIMESTAMP):
# we have a timezone capable type
if not sqltype.timezone:
return np.dtype('datetime64[ns]')
return DatetimeTZDtype
elif isinstance(sqltype, DateTime):
# Caution: np.datetime64 is also a subclass of np.number.
return np.dtype('datetime64[ns]')
elif isinstance(sqltype, Date):
return np.date
elif isinstance(sqltype, Boolean):
return bool
elif isinstance(sqltype, sqld.mssql.base.BIT):
# Handling database provider specific types
return np.dtype('u1')
# Catch all type - handle provider specific types in another elif block
return object
def __write_parquet(output_path: str, batch_array, column_dict, write_index: bool,
compression: str, append: bool):
# Create the DataFrame to hold the batch array contents
b_df = pd.DataFrame(batch_array, columns=column_dict)
# Cast the DataFrame columns to the sqlalchemy column analogues
b_df = b_df.astype(dtype=column_dict)
# Write to the parquet file (first write needs append=False)
fastparquet.write(output_path, b_df,
write_index=write_index, compression=compression, append=append)
def table_to_parquet(output_path: str, table_name: str, con,
batch_size: int = 10000, write_index: bool = True,
compression: str = None):
# Get database schema using sqlalchemy reflection
db_engine = create_engine(con)
db_metadata = schema.MetaData(bind=db_engine)
db_table = Table(table_name, db_metadata, autoload=True)
# Get the columns for the parquet file
column_dict = dict()
for column in db_table.columns:
dtype = __get_dtype(column, column.type)
column_dict[column.name] = dtype
# Query the table
result = db_table.select().execute()
row_batch = result.fetchmany(size=batch_size)
append = False
while(len(row_batch) > 0):
__write_parquet(output_path, row_batch,
column_dict, write_index, compression, append)
append = True
row_batch = result.fetchmany(size=batch_size)
@TrentGlover
Copy link

Been using this code for awhile. Migrated the code to a new machine and it started throwing this error. I'm no python expert by any means, but I've been attempting to troubleshoot it for several days

File "C:\appl\python\Code\PythonTest\venv\lib\site-packages\fastparquet\writer.py", line 1499, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'

@Upgwades
Copy link

Replacing:
column_dict[column.name] = dtype
with
column_dict[str(column.name)] = dtype
resolved the issue. Seems like the column.name is not really a string.

@juliobetta
Copy link

Replacing: column_dict[column.name] = dtype with column_dict[str(column.name)] = dtype resolved the issue. Seems like the column.name is not really a string.

@Upgwades THANK YOU!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment