Skip to content

Instantly share code, notes, and snippets.

@mw3i
Created September 7, 2023 15:04
Show Gist options
  • Save mw3i/67be51f38ec16443f9dc1863e082204b to your computer and use it in GitHub Desktop.
Save mw3i/67be51f38ec16443f9dc1863e082204b to your computer and use it in GitHub Desktop.
Prototype: Dataframe Upsert
'''
Built with chatgpt; still in the process of testing
'''
import time
import pandas as pd
from sqlalchemy import create_engine, Table, MetaData, select, insert, update, bindparam
def upsert_dataframe_to_sql(dataframe, table_name, id_column="id", verbose=True):
"""
Upsert a Pandas DataFrame into a SQL table using SQLAlchemy.
Args:
dataframe (pd.DataFrame): The DataFrame to upsert.
table_name (str): The name of the SQL table to upsert into.
id_column (str, optional): The name of the primary key or unique column. Defaults to "id".
verbose (bool, optional): Whether to print verbose information. Defaults to True.
Returns:
None
"""
start_time = time.time()
# Create a SQLAlchemy engine
engine = create_engine('postgresql://username:password@localhost:5432/database')
# Get the table metadata and columns
metadata = MetaData(bind=engine)
table = Table(table_name, metadata, autoload=True)
columns = table.columns.keys()
# Get the current row count in the table
existing_rows_count = engine.execute(select([table.c[id_column]]).count()).scalar()
# Use Pandas to create a series of parameters for the IDs in the current DataFrame
id_param_series = dataframe[id_column].apply(lambda id_val: bindparam('param_' + id_column, value=id_val))
# Use SQL to check which IDs already exist in the table
id_exists_query = select([table.c[id_column].in_(id_param_series)])
existing_id_flags = engine.execute(id_exists_query).fetchall()
# Extract the boolean values from the result using Pandas
id_exists_flags = pd.Series([bool(row[0]) for row in existing_id_flags])
# Identify rows to update (existing) and rows to insert (non-existing)
update_data = dataframe[id_exists_flags]
insert_data = dataframe[~id_exists_flags]
# Perform updates for existing rows
if not update_data.empty:
update_stmt = update(table).where(table.c[id_column] == bindparam('param_' + id_column))
engine.execute(update_stmt, update_data.to_dict(orient='records'))
# Perform inserts for non-existing rows
if not insert_data.empty:
engine.execute(table.insert().values(insert_data.to_dict(orient='records')))
# Calculate and print verbose information
end_time = time.time()
total_rows_before_upsert = existing_rows_count
rows_updated = len(update_data)
rows_inserted = len(insert_data)
total_rows_after_upsert = total_rows_before_upsert + rows_updated + rows_inserted
execution_time = end_time - start_time
if verbose:
print(f"Total rows before upsert: {total_rows_before_upsert}")
print(f"Rows updated: {rows_updated}")
print(f"Rows inserted: {rows_inserted}")
print(f"Total rows after upsert: {total_rows_after_upsert}")
print(f"Execution time: {execution_time:.2f} seconds")
# Example usage:
# upsert_dataframe_to_sql(my_dataframe, "my_table", id_column="my_id")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment