Skip to content

Instantly share code, notes, and snippets.

Created June 26, 2015 04:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alpha-beta-soup/6f0fe6a219a014ca9d4d to your computer and use it in GitHub Desktop.
Save alpha-beta-soup/6f0fe6a219a014ca9d4d to your computer and use it in GitHub Desktop.
Psycopg2 Upsert
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import json
def pgsql_upsert(cur, schema, table, primary_keys, **kwargs):
Constructs a piece of SQL to implement a PGSQL upsert for versions of
Postgres < 9.5. (From 9.5+, please use the INSERT ON CONFLICT UPDATE
<schema> and <table> must be strings referring to the schema and table that
are to be subject of the upsert.
<primary_keys> is a list (possibly empty) specifing the names of the columns
that are primary keys, or at least are columns who in combination determine
if a particular record is new or an update to an existing record.
<**kwargs> should be specified with the column name as the key, and the value
to upsert as the value, in the form of a Python object that psycopg2 can
apprpriately mogrify. (Although dictionaries are valid; see note.)
Psycopg2 does not know how to deal with dictionaries. I assume a supplied
dictionary is destined for a column of type JSON, and apply appropriate
type casting to achieve this without raising any error.
There are unresolved issues with psycopg2 incorrectly mogrifying NULL values
for columns of timestampz (and possibly other types). E.g. psycopg2 creates
`NULL AS column`
when Postgres *only* accepts
`NULL::timestampz AS column`
I see no pretty solution to this. (Recommendation appreciated.)
This function does not explicitly defend against all forms of SQL injection,
or otherwise inspect what you throw at it. You should pre-sanitise your own
inputs to an appropriate extent.
There are more reserved words in Postgres than I have controlled for.
It is better that you avoid using a key word for a column name, but if it's
beyond your control or not very convenient to avoid it, you can amend
the list `invalid_column_names` as appropriate.
This has not really accounted for Postgres functions used as part of a
SELECT statement, with the sole exception of ST_GeomFromText, which is the
only function I was using when writing this function. Adding others and
abstracting the process is possible; replicate the pattern within
select_values() to achieve the former.
def sanitise_column_name(column_name):
invalid_column_names = ['id','order','name','group','reference','start','end']
for icn in invalid_column_names:
if icn == column_name:
column_name = column_name.replace(icn,'"{}"'.format(icn))
return column_name
def select_values(**kwargs):
# Has to do a little bit of messy work for ST_GeomFromText function
# in an insert statement, and for hyphens in text/unicode values
s = []
for k, v in kwargs.items():
if type(v) is dict:
# Psycopg2 can't understand dict == json
s.append('%({k})s::json AS {k}'.format(k=k))
elif (type(v) is str or type(v) is unicode) and 'ST_GeomFromText' in v:
# Don't wrap an ST_GeomFromText in single quotes
s.append('{v} AS {k}'.format(v=v,k=k).replace("''","'"))
elif (type(v) is str or type(v) is unicode) and '-' in v:
# Use E to avoid need for escape characters
s.append('%({k})s::text AS {k}'.format(k=k))
s.append('%({k})s AS {k}'.format(k=k))
return ',\n\t'.join(s)
def set_update(**kwargs):
return ',\n\t'.join(['{k1} = upsert_data.{k2}'.format(k1=sanitise_column_name(k),k2=k) for k in kwargs.keys()])
def pk_where(schema, table, primary_keys):
return ',\n\t'.join(['{}.{}.{} = upsert_data.{}'.format(schema,table,pk,pk) for pk in primary_keys])
def columns(**kwargs):
#return str(tuple(kwargs.keys())).replace("'","")
cols = ['{k}'.format(k=sanitise_column_name(k)) for k in kwargs.keys()]
return '({cols})'.format(cols=', '.join(cols))
def select_columns(**kwargs):
return ',\n\t'.join(['upsert_data.{k} AS {k}'.format(k=k) for k in kwargs.keys()])
def pk_return(primary_keys,upsert_schema=''):
if upsert_schema is None:
upsert_schema = ''
if upsert_schema != '' and upsert_schema[-1] != '.':
upsert_schema += '.'
return ',\n\t'.join(['{upsert_schema}{pk}'.format(upsert_schema=upsert_schema,pk=pk) for pk in primary_keys])
q = '''WITH upsert_data AS (
update_outcome AS (
UPDATE {schema}.{table}
FROM upsert_data
RETURNING 'update'::text AS action,
insert_outcome AS (
INSERT INTO {schema}.{table}
FROM upsert_data
FROM update_outcome LIMIT 1)
RETURNING 'insert'::text AS action,
SELECT * FROM update_outcome UNION ALL SELECT * FROM insert_outcome;
schema = schema,
table = table,
SELECT_VALUES = select_values(**kwargs),
SET_UPDATE = set_update(**kwargs),
PK_WHERE = pk_where(schema, table, primary_keys),
PK_RETURN_UPSERT = pk_return(primary_keys, 'upsert_data'),
PK_RETURN = pk_return(primary_keys),
COLUMNS = columns(**kwargs),
SELECT_COLUMNS = select_columns(**kwargs),
for k, v in kwargs.items():
if type(v) is dict:
# All dictionary values are dumped to JSON
kwargs[k] = str(json.dumps(v))
if __name__ == '__main__':
print cur.mogrify(q,kwargs)
def main():
Sample table:
CREATE TABLE public.test;
ALTER TABLE public.test
username = 'postgres'
password = 'postgres'
conn = psycopg2.connect("dbname='test' user={} password={}".format(username,password))
cur = conn.cursor()
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='John', age=25)
pgsql_upsert(cur, 'public', 'test', ['id'], id=2, name='Shiva', age=32)
pgsql_upsert(cur, 'public', 'test', ['id'], id=3, name='Tonberry', age=12)
# Following will overwrite 'John (id=1)
pgsql_upsert(cur, 'public', 'test', ['id'], id=1, name='Ifrit', age=205)
if __name__ == '__main__':
# Run sample
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment