Created
November 26, 2014 21:33
-
-
Save Stiivi/17c0dc62acb04caccee3 to your computer and use it in GitHub Desktop.
Simple table → table UPSERT with mapping (using SQLAlchemy)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import sql | |
from collections import OrderedDict | |
def simple_upsert(source, target, source_key, target_key, mapping): | |
"""Simple transformative upsert from `source` table into `target` table | |
where `source_key` is unique key in the `source` table amd `target_key` | |
is its equivalend in the `target` table. `mapping` is a dictionary that | |
maps between target's column names and source column names. | |
Returns a tuple of two statements (`update`, `insert`). | |
""" | |
# Preparation | |
# | |
source_key_col = source.columns[source_key] | |
target_key_col = target.columns[target_key] | |
# UPDATE | |
# ====== | |
# match_condition = source_key_col == target_key_col | |
inner_match = source_key_col == target_key_col | |
join = source.join(target, onclause=inner_match) | |
existing = sql.expression.select(source.columns, from_obj=join) | |
existing = existing.alias("__existing__") | |
values = OrderedDict() | |
for target_name, src_name in mapping.items(): | |
values[target_name] = existing.columns[src_name] | |
# Existing now contains the filtered source | |
match_condition = target_key_col == existing.columns[source_key] | |
update = target.update(match_condition).values(**values) | |
# INSERT | |
# ====== | |
# 1. get the new elements | |
# | |
# news = target - source | |
# | |
existing = sql.expression.select([target_key_col], | |
from_obj=target) | |
news = sql.expression.select([source_key_col], | |
from_obj=source) | |
news = news.except_(existing).alias("__news__") | |
# 2. Prepare the target columns to be inserted from the source | |
# | |
column_mapping = OrderedDict() | |
column_mapping[target_key] = source_key_col | |
for target_col, src_col in mapping.items(): | |
column_mapping[target_col] = source.columns[src_col] | |
# Filter-out the source using INNER JOIN | |
news_condition = source_key_col == news.columns[source_key] | |
join = source.join(news, news_condition) | |
columns = to_columns(column_mapping) | |
select = sql.expression.select(columns, | |
from_obj=join) | |
insert = target.insert().from_select(column_mapping.keys(), select) | |
return (update, insert) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment