Skip to content

Instantly share code, notes, and snippets.

@alpha-beta-soup
Created June 26, 2015 04:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alpha-beta-soup/6f0fe6a219a014ca9d4d to your computer and use it in GitHub Desktop.
Save alpha-beta-soup/6f0fe6a219a014ca9d4d to your computer and use it in GitHub Desktop.
Psycopg2 Upsert
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import json
def pgsql_upsert(cur, schema, table, primary_keys, **kwargs):
"""
Constructs a piece of SQL to implement a PGSQL upsert for versions of
Postgres < 9.5. (From 9.5+, please use the INSERT ON CONFLICT UPDATE
construction: https://wiki.postgresql.org/wiki/UPSERT)
<schema> and <table> must be strings referring to the schema and table that
are to be subject of the upsert.
<primary_keys> is a list (possibly empty) specifing the names of the columns
that are primary keys, or at least are columns who in combination determine
if a particular record is new or an update to an existing record.
<**kwargs> should be specified with the column name as the key, and the value
to upsert as the value, in the form of a Python object that psycopg2 can
apprpriately mogrify. (Although dictionaries are valid; see note.)
NOTE:
Psycopg2 does not know how to deal with dictionaries. I assume a supplied
dictionary is destined for a column of type JSON, and apply appropriate
type casting to achieve this without raising any error.
NOTE:
There are unresolved issues with psycopg2 incorrectly mogrifying NULL values
for columns of timestampz (and possibly other types). E.g. psycopg2 creates
`NULL AS column`
when Postgres *only* accepts
`NULL::timestampz AS column`
I see no pretty solution to this. (Recommendation appreciated.)
NOTE:
This function does not explicitly defend against all forms of SQL injection,
or otherwise inspect what you throw at it. You should pre-sanitise your own
inputs to an appropriate extent.
NOTE:
There are more reserved words in Postgres than I have controlled for.
(See http://www.postgresql.org/docs/7.3/static/sql-keywords-appendix.html)
It is better that you avoid using a key word for a column name, but if it's
beyond your control or not very convenient to avoid it, you can amend
the list `invalid_column_names` as appropriate.
NOTE:
This has not really accounted for Postgres functions used as part of a
SELECT statement, with the sole exception of ST_GeomFromText, which is the
only function I was using when writing this function. Adding others and
abstracting the process is possible; replicate the pattern within
select_values() to achieve the former.
"""
def sanitise_column_name(column_name):
invalid_column_names = ['id','order','name','group','reference','start','end']
for icn in invalid_column_names:
if icn == column_name:
column_name = column_name.replace(icn,'"{}"'.format(icn))
return column_name
def select_values(**kwargs):
# Has to do a little bit of messy work for ST_GeomFromText function
# in an insert statement, and for hyphens in text/unicode values
s = []
for k, v in kwargs.items():
if type(v) is dict:
# Psycopg2 can't understand dict == json
s.append('%({k})s::json AS {k}'.format(k=k))
elif (type(v) is str or type(v) is unicode) and 'ST_GeomFromText' in v:
# Don't wrap an ST_GeomFromText in single quotes
s.append('{v} AS {k}'.format(v=v,k=k).replace("''","'"))
elif (type(v) is str or type(v) is unicode) and '-' in v:
# Use E to avoid need for escape characters
s.append('%({k})s::text AS {k}'.format(k=k))
else:
s.append('%({k})s AS {k}'.format(k=k))
return ',\n\t'.join(s)
def set_update(**kwargs):
return ',\n\t'.join(['{k1} = upsert_data.{k2}'.format(k1=sanitise_column_name(k),k2=k) for k in kwargs.keys()])
def pk_where(schema, table, primary_keys):
return ',\n\t'.join(['{}.{}.{} = upsert_data.{}'.format(schema,table,pk,pk) for pk in primary_keys])
def columns(**kwargs):
#return str(tuple(kwargs.keys())).replace("'","")
cols = ['{k}'.format(k=sanitise_column_name(k)) for k in kwargs.keys()]
return '({cols})'.format(cols=', '.join(cols))
def select_columns(**kwargs):
return ',\n\t'.join(['upsert_data.{k} AS {k}'.format(k=k) for k in kwargs.keys()])
def pk_return(primary_keys,upsert_schema=''):
if upsert_schema is None:
upsert_schema = ''
if upsert_schema != '' and upsert_schema[-1] != '.':
upsert_schema += '.'
return ',\n\t'.join(['{upsert_schema}{pk}'.format(upsert_schema=upsert_schema,pk=pk) for pk in primary_keys])
q = '''WITH upsert_data AS (
SELECT
{SELECT_VALUES}
),
update_outcome AS (
UPDATE {schema}.{table}
SET
{SET_UPDATE}
FROM upsert_data
WHERE
{PK_WHERE}
RETURNING 'update'::text AS action,
{PK_RETURN_UPSERT}
),
insert_outcome AS (
INSERT INTO {schema}.{table}
{COLUMNS}
SELECT
{SELECT_COLUMNS}
FROM upsert_data
WHERE NOT EXISTS (SELECT
{PK_RETURN}
FROM update_outcome LIMIT 1)
RETURNING 'insert'::text AS action,
{PK_RETURN}
)
SELECT * FROM update_outcome UNION ALL SELECT * FROM insert_outcome;
'''.format(
schema = schema,
table = table,
SELECT_VALUES = select_values(**kwargs),
SET_UPDATE = set_update(**kwargs),
PK_WHERE = pk_where(schema, table, primary_keys),
PK_RETURN_UPSERT = pk_return(primary_keys, 'upsert_data'),
PK_RETURN = pk_return(primary_keys),
COLUMNS = columns(**kwargs),
SELECT_COLUMNS = select_columns(**kwargs),
)
for k, v in kwargs.items():
if type(v) is dict:
# All dictionary values are dumped to JSON
kwargs[k] = str(json.dumps(v))
if __name__ == '__main__':
print cur.mogrify(q,kwargs)
cur.execute(q,kwargs)
def main():
'''
Sample table:
CREATE TABLE public.test;
ALTER TABLE public.test
ADD COLUMN id INTEGER PRIMARY KEY,
ADD COLUMN "name" TEXT,
ADD COLUMN age INTEGER;
'''
username = 'postgres'
password = 'postgres'
conn = psycopg2.connect("dbname='test' user={} password={}".format(username,password))
cur = conn.cursor()
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='John', age=25)
pgsql_upsert(cur, 'public', 'test', ['id'], id=2, name='Shiva', age=32)
conn.commit()
pgsql_upsert(cur, 'public', 'test', ['id'], id=3, name='Tonberry', age=12)
# Following will overwrite 'John (id=1)
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='Ifrit', age=205)
conn.commit()
if __name__ == '__main__':
# Run sample
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment