-
-
Save alpha-beta-soup/6f0fe6a219a014ca9d4d to your computer and use it in GitHub Desktop.
Psycopg2 Upsert
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import psycopg2 | |
import json | |
def pgsql_upsert(cur, schema, table, primary_keys, **kwargs): | |
""" | |
Constructs a piece of SQL to implement a PGSQL upsert for versions of | |
Postgres < 9.5. (From 9.5+, please use the INSERT ON CONFLICT UPDATE | |
construction: https://wiki.postgresql.org/wiki/UPSERT) | |
<schema> and <table> must be strings referring to the schema and table that | |
are to be subject of the upsert. | |
<primary_keys> is a list (possibly empty) specifing the names of the columns | |
that are primary keys, or at least are columns who in combination determine | |
if a particular record is new or an update to an existing record. | |
<**kwargs> should be specified with the column name as the key, and the value | |
to upsert as the value, in the form of a Python object that psycopg2 can | |
apprpriately mogrify. (Although dictionaries are valid; see note.) | |
NOTE: | |
Psycopg2 does not know how to deal with dictionaries. I assume a supplied | |
dictionary is destined for a column of type JSON, and apply appropriate | |
type casting to achieve this without raising any error. | |
NOTE: | |
There are unresolved issues with psycopg2 incorrectly mogrifying NULL values | |
for columns of timestampz (and possibly other types). E.g. psycopg2 creates | |
`NULL AS column` | |
when Postgres *only* accepts | |
`NULL::timestampz AS column` | |
I see no pretty solution to this. (Recommendation appreciated.) | |
NOTE: | |
This function does not explicitly defend against all forms of SQL injection, | |
or otherwise inspect what you throw at it. You should pre-sanitise your own | |
inputs to an appropriate extent. | |
NOTE: | |
There are more reserved words in Postgres than I have controlled for. | |
(See http://www.postgresql.org/docs/7.3/static/sql-keywords-appendix.html) | |
It is better that you avoid using a key word for a column name, but if it's | |
beyond your control or not very convenient to avoid it, you can amend | |
the list `invalid_column_names` as appropriate. | |
NOTE: | |
This has not really accounted for Postgres functions used as part of a | |
SELECT statement, with the sole exception of ST_GeomFromText, which is the | |
only function I was using when writing this function. Adding others and | |
abstracting the process is possible; replicate the pattern within | |
select_values() to achieve the former. | |
""" | |
def sanitise_column_name(column_name): | |
invalid_column_names = ['id','order','name','group','reference','start','end'] | |
for icn in invalid_column_names: | |
if icn == column_name: | |
column_name = column_name.replace(icn,'"{}"'.format(icn)) | |
return column_name | |
def select_values(**kwargs): | |
# Has to do a little bit of messy work for ST_GeomFromText function | |
# in an insert statement, and for hyphens in text/unicode values | |
s = [] | |
for k, v in kwargs.items(): | |
if type(v) is dict: | |
# Psycopg2 can't understand dict == json | |
s.append('%({k})s::json AS {k}'.format(k=k)) | |
elif (type(v) is str or type(v) is unicode) and 'ST_GeomFromText' in v: | |
# Don't wrap an ST_GeomFromText in single quotes | |
s.append('{v} AS {k}'.format(v=v,k=k).replace("''","'")) | |
elif (type(v) is str or type(v) is unicode) and '-' in v: | |
# Use E to avoid need for escape characters | |
s.append('%({k})s::text AS {k}'.format(k=k)) | |
else: | |
s.append('%({k})s AS {k}'.format(k=k)) | |
return ',\n\t'.join(s) | |
def set_update(**kwargs): | |
return ',\n\t'.join(['{k1} = upsert_data.{k2}'.format(k1=sanitise_column_name(k),k2=k) for k in kwargs.keys()]) | |
def pk_where(schema, table, primary_keys): | |
return ',\n\t'.join(['{}.{}.{} = upsert_data.{}'.format(schema,table,pk,pk) for pk in primary_keys]) | |
def columns(**kwargs): | |
#return str(tuple(kwargs.keys())).replace("'","") | |
cols = ['{k}'.format(k=sanitise_column_name(k)) for k in kwargs.keys()] | |
return '({cols})'.format(cols=', '.join(cols)) | |
def select_columns(**kwargs): | |
return ',\n\t'.join(['upsert_data.{k} AS {k}'.format(k=k) for k in kwargs.keys()]) | |
def pk_return(primary_keys,upsert_schema=''): | |
if upsert_schema is None: | |
upsert_schema = '' | |
if upsert_schema != '' and upsert_schema[-1] != '.': | |
upsert_schema += '.' | |
return ',\n\t'.join(['{upsert_schema}{pk}'.format(upsert_schema=upsert_schema,pk=pk) for pk in primary_keys]) | |
q = '''WITH upsert_data AS ( | |
SELECT | |
{SELECT_VALUES} | |
), | |
update_outcome AS ( | |
UPDATE {schema}.{table} | |
SET | |
{SET_UPDATE} | |
FROM upsert_data | |
WHERE | |
{PK_WHERE} | |
RETURNING 'update'::text AS action, | |
{PK_RETURN_UPSERT} | |
), | |
insert_outcome AS ( | |
INSERT INTO {schema}.{table} | |
{COLUMNS} | |
SELECT | |
{SELECT_COLUMNS} | |
FROM upsert_data | |
WHERE NOT EXISTS (SELECT | |
{PK_RETURN} | |
FROM update_outcome LIMIT 1) | |
RETURNING 'insert'::text AS action, | |
{PK_RETURN} | |
) | |
SELECT * FROM update_outcome UNION ALL SELECT * FROM insert_outcome; | |
'''.format( | |
schema = schema, | |
table = table, | |
SELECT_VALUES = select_values(**kwargs), | |
SET_UPDATE = set_update(**kwargs), | |
PK_WHERE = pk_where(schema, table, primary_keys), | |
PK_RETURN_UPSERT = pk_return(primary_keys, 'upsert_data'), | |
PK_RETURN = pk_return(primary_keys), | |
COLUMNS = columns(**kwargs), | |
SELECT_COLUMNS = select_columns(**kwargs), | |
) | |
for k, v in kwargs.items(): | |
if type(v) is dict: | |
# All dictionary values are dumped to JSON | |
kwargs[k] = str(json.dumps(v)) | |
if __name__ == '__main__': | |
print cur.mogrify(q,kwargs) | |
cur.execute(q,kwargs) | |
def main(): | |
''' | |
Sample table: | |
CREATE TABLE public.test; | |
ALTER TABLE public.test | |
ADD COLUMN id INTEGER PRIMARY KEY, | |
ADD COLUMN "name" TEXT, | |
ADD COLUMN age INTEGER; | |
''' | |
username = 'postgres' | |
password = 'postgres' | |
conn = psycopg2.connect("dbname='test' user={} password={}".format(username,password)) | |
cur = conn.cursor() | |
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='John', age=25) | |
pgsql_upsert(cur, 'public', 'test', ['id'], id=2, name='Shiva', age=32) | |
conn.commit() | |
pgsql_upsert(cur, 'public', 'test', ['id'], id=3, name='Tonberry', age=12) | |
# Following will overwrite 'John (id=1) | |
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='Ifrit', age=205) | |
conn.commit() | |
if __name__ == '__main__': | |
# Run sample | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment