Skip to content

Instantly share code, notes, and snippets.

@Stiivi
Created November 26, 2014 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Stiivi/17c0dc62acb04caccee3 to your computer and use it in GitHub Desktop.
Save Stiivi/17c0dc62acb04caccee3 to your computer and use it in GitHub Desktop.
Simple table → table UPSERT with mapping (using SQLAlchemy)
from sqlalchemy import sql
from collections import OrderedDict
def simple_upsert(source, target, source_key, target_key, mapping):
"""Simple transformative upsert from `source` table into `target` table
where `source_key` is unique key in the `source` table amd `target_key`
is its equivalend in the `target` table. `mapping` is a dictionary that
maps between target's column names and source column names.
Returns a tuple of two statements (`update`, `insert`).
"""
# Preparation
#
source_key_col = source.columns[source_key]
target_key_col = target.columns[target_key]
# UPDATE
# ======
# match_condition = source_key_col == target_key_col
inner_match = source_key_col == target_key_col
join = source.join(target, onclause=inner_match)
existing = sql.expression.select(source.columns, from_obj=join)
existing = existing.alias("__existing__")
values = OrderedDict()
for target_name, src_name in mapping.items():
values[target_name] = existing.columns[src_name]
# Existing now contains the filtered source
match_condition = target_key_col == existing.columns[source_key]
update = target.update(match_condition).values(**values)
# INSERT
# ======
# 1. get the new elements
#
# news = target - source
#
existing = sql.expression.select([target_key_col],
from_obj=target)
news = sql.expression.select([source_key_col],
from_obj=source)
news = news.except_(existing).alias("__news__")
# 2. Prepare the target columns to be inserted from the source
#
column_mapping = OrderedDict()
column_mapping[target_key] = source_key_col
for target_col, src_col in mapping.items():
column_mapping[target_col] = source.columns[src_col]
# Filter-out the source using INNER JOIN
news_condition = source_key_col == news.columns[source_key]
join = source.join(news, news_condition)
columns = to_columns(column_mapping)
select = sql.expression.select(columns,
from_obj=join)
insert = target.insert().from_select(column_mapping.keys(), select)
return (update, insert)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment