Skip to content

Instantly share code, notes, and snippets.

@timtadh
Created December 5, 2013 19:14
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save timtadh/7811458 to your computer and use it in GitHub Desktop.
Save timtadh/7811458 to your computer and use it in GitHub Desktop.
How to compile an INSERT ... ON DUPLICATE KEY UPDATE with SQL Alchemy with support for a bulk insert.
#!/usr/bin/env python
# Copyright (c) 2012, Tim Henderson
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# - Neither the name of this software nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
This module provides an "Upsert" statement for SQL Alchemy which uses
ON DUPLICATE KEY UPDATE to implement the insert or update semantics. It
supports doing a bulk insert.
"""
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
class Upsert(expr.Insert): pass
@compiles(Upsert, "mysql")
def compile_upsert(insert_stmt, compiler, **kwargs):
if insert_stmt._has_multi_parameters:
keys = insert_stmt.parameters[0].keys()
else:
keys = insert_stmt.parameters.keys()
pk = insert_stmt.table.primary_key
auto = None
if (len(pk.columns) == 1 and
isinstance(pk.columns.values()[0].type, sa.Integer) and
pk.columns.values()[0].autoincrement):
auto = pk.columns.keys()[0]
if auto in keys:
keys.remove(auto)
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = 'ON DUPLICATE KEY UPDATE'
updates = ', '.join(
'%s = VALUES(%s)' % (c.name, c.name)
for c in insert_stmt.table.columns
if c.name in keys
)
if auto is not None:
last_id = '%s = LAST_INSERT_ID(%s)' % (auto, auto)
if updates:
updates = ', '.join((last_id, updates))
else:
updates = last_id
upsert = ' '.join((insert, ondup, updates))
return upsert
@Zuckonit
Copy link

Zuckonit commented Aug 8, 2014

where did the expr defined?

@advance512
Copy link

import sqlalchemy.sql.expression as expr

@webstar0025
Copy link

Could you write one example how to use it?

I tried session.execute(AppPackage.table.upsert(),values) but it is not working..

@l-r
Copy link

l-r commented Mar 4, 2016

session.execute(Upsert(table, items_list)) should do it.

@simonatdrg
Copy link

can you amplify the example with code that doesn't use Sessions ? I'm not using the ORM API so I don't think I can use session objects.

Here's a concrete example, using a 'users' table with 3 columns, id is primary key
ins = users.insert().values(name='jack', fullname='Jack Jones', id=999)
res = conn.execute(ins)

and I'd like to use Upsert to change that record, say to
(name='snoopy', fullname='clever dog', id=999 )

nothing I've tried so far has worked.

@dovy
Copy link

dovy commented Nov 18, 2016

I modified this slightly to account for unicode strings and easier imports:

import sqlalchemy
## SQLAlchemy Insert or Update for sessions, w00t!
class Upsert(sqlalchemy.sql.expression.Insert):
    pass
from sqlalchemy.ext.compiler import compiles
@compiles(Upsert, "mysql")
def compile_upsert(insert_stmt, compiler, **kwargs):
    if insert_stmt._has_multi_parameters:
        keys = insert_stmt.parameters[0].keys()
    else:
        keys = insert_stmt.parameters.keys()
    pk = insert_stmt.table.primary_key
    auto = None
    if (len(pk.columns) == 1 and
            isinstance(pk.columns.values()[0].type, sqlalchemy.Integer) and
            pk.columns.values()[0].autoincrement):
        auto = pk.columns.keys()[0]
        if auto in keys:
            keys.remove(auto)
    insert = compiler.visit_insert(insert_stmt, **kwargs)
    ondup = 'ON DUPLICATE KEY UPDATE'
    updates = ', '.join(
        '{} = VALUES({})'.format(c.name, c.name)
        for c in insert_stmt.table.columns
        if c.name in keys
    )
    if auto is not None:
        last_id = '{} = LAST_INSERT_ID({})'.format(auto, auto)
        if updates:
            updates = ', '.join((last_id, updates))
        else:
            updates = last_id
    upsert = ' '.join((insert, ondup, updates))
    return upsert

Here's a usage example:

from sqlalchemy.pool import NullPool
config['engine'] = sqlalchemy.create_engine(
            'mysql://{}:{}@{}:{}/{}'.format(
                                            config['user'],
                                            config['password'],
                                            config['host'],
                                            config['port'],
                                            config['database']),
            poolclass=NullPool
)
config['metadata'] = sqlalchemy.MetaData(config['engine'])
config['tableObj'] = sqlalchemy.Table(config['table'], config['metadata'], autoload=True)
config['connection'] = config['engine'].connect()
config['connection'].execute(Upsert(self._config['tableObj'], record))
# You can do multiple executes here over many objects. They won't auto-commit until close().
config['connection'].close()

@sfoley
Copy link

sfoley commented Feb 8, 2017

That code doesnt take into account any special formatting for column names in the updates section. Something like the following might be an improvement for cross-compability:

    updates = ', '.join(
        '{} = VALUES({})'.format(preparer.format_column(column(c.name)), preparer.format_column(column(c.name)))
        for c in insert_stmt.table.columns
        if c.name in keys
    )

@jiamo
Copy link

jiamo commented Sep 5, 2020

How to get the returning id?

@cdpath
Copy link

cdpath commented Jun 11, 2021

FYI, I found another gist to do the upsert.

@benbogart
Copy link

benbogart commented Jun 27, 2021

I'm getting the following error:
AttributeError: 'Upsert' object has no attribute '_has_multi_parameters

What type is the values object that we pass in? I was trying with a dict.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment