Skip to content

Instantly share code, notes, and snippets.

@nirizr
Last active September 28, 2023 23:19
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save nirizr/9145aa27dd953bd73d11251d386fdbf1 to your computer and use it in GitHub Desktop.
Save nirizr/9145aa27dd953bd73d11251d386fdbf1 to your computer and use it in GitHub Desktop.
sqlalchemy upsert supporting delayed ORM insertion and duplicate removal (inside a single query)
def upsert(session, model, rows):
table = model.__table__
stmt = postgresql.insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")
stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
set_=update_dict)
seen = set()
foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
def handle_foreignkeys_constraints(row):
for c_name, c_value in foreign_keys.items():
foreign_obj = row.pop(c_value.table.name, None)
row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None
for const in unique_constraints:
unique = tuple([const,] + [row[col.name] for col in const.columns])
if unique in seen:
return None
seen.add(unique)
return row
rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
session.execute(stmt, rows)
@nirizr
Copy link
Author

nirizr commented Jul 28, 2018

This upsert method lets you reference ORM objects in future upsert SQL queries, and will also remove any local duplicates (which is something postgresql does not handle, i.e. if you insert the same row twice in the same upsert query postgreql will still fail).

example usage may, given the following Model objects:

class ParentModel(Model):
    id = Column(Integer, primary_key=True)

class ChildModel(Model):
    id = Column(Integer, primary_key=True)
    child_number = Column(Integer)
    parent_id = Column(Integer, ForeignKey(ParentModel.id))

be something like the following:

upsert_rows = {}

for parent in [ParentModel(), ParentModel()]:
   for i in range(10):
        child_model = {'id': i, 'parent': parent, 'child_number': i}
        upsert_rows.append(child_model)

# Parent objects will only be committed now, and their `id` fields will be populated.
session.commit()

# upsert will now fetch `ParentModel.id`s and will populate the `parent_id` fields.
upsert(session, ChildModel, upsert_rows)

Without using this upsert, you'd be requried to commit parent at the beginning of every loop iteration, instead of once at the end of the entire for block and each ParentModel object will have to be inserted individually.

Additionally, items violating UniqueConstraints in a single query will be filtered out to avoid postgresql errors.

@paultop6
Copy link

paultop6 commented Feb 8, 2022

Hi. I have a quick question about this gist. In handle_foreignkeys_constraints, what is the purpose of re-assigning row[c_name]?

@nirizr
Copy link
Author

nirizr commented Mar 13, 2022

This has been written a long time and honestly I don't recall. I should've probably documented that snippet better...

I wrote this snippet while writing this SO answer, so it might be helpful in understanding the reason: https://stackoverflow.com/a/51567630/1146713

I think this was to keep foreign key values after or before the SQL upsert. Sorry I don't have a better answer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment