Skip to content

Instantly share code, notes, and snippets.

@amcclosky
Last active May 26, 2019 21:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amcclosky/76fc3b6db8f0cffd26bc97ae6535c95d to your computer and use it in GitHub Desktop.
Save amcclosky/76fc3b6db8f0cffd26bc97ae6535c95d to your computer and use it in GitHub Desktop.
class CSVImportMixin:
"""Adds from_csv to a SQLAlchemy model class."""
__tablename__ = None
__upsert_keys__ = ("id",)
@classmethod
def _columns(cls):
_columns = get_columns(cls)
return [c for c in _columns]
@classmethod
def _csv_mapping(cls):
return dict(((c.info["csv_column"], c.name) for c in cls._columns() if "csv_column" in c.info))
@classmethod
def from_csv(cls, filename, constraint=None, chunksize=None, null_mapper=None, session=None):
import pandas as pd
if null_mapper is None:
null_mapper = dict()
def handle_null(item):
k, v = item
if v is not None:
return item
return k, null_mapper.get(k)
csv_data = pd.read_csv(filename)
csv_data.rename(lambda c: cls._csv_mapping()[c.lower()], axis="columns", inplace=True)
def upsert(_, engine, keys, data_iter):
stmt = insert(cls.__table__, bind=engine)
on_conflict = {"set_": {k: getattr(stmt.excluded, k) for k in iter(keys)}}
if constraint:
on_conflict["constraint"] = constraint
else:
on_conflict["index_elements"] = cls.__upsert_keys__
stmt = stmt.on_conflict_do_update(**on_conflict)
def rows():
for row in data_iter:
yield dict(map(handle_null, zip(keys, row)))
engine.execute(stmt.values(list(rows())))
csv_data.to_sql(
cls.__tablename__,
if_exists="append",
index=False,
method=upsert,
con=db.session.get_bind() if session is None else session,
chunksize=chunksize,
)
@amcclosky
Copy link
Author

SQLAlchemy Model mixin for upserting data from a csv file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment