Skip to content

Instantly share code, notes, and snippets.

@altaurog
Forked from dvarrazzo/exmany.py
Last active February 1, 2017 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save altaurog/84668e034646fb354b5de81bb86a580d to your computer and use it in GitHub Desktop.
Save altaurog/84668e034646fb354b5de81bb86a580d to your computer and use it in GitHub Desktop.
Test for a different implementation of psycopg executemany
import contextlib
import itertools
import operator
import os
import sys
import time
import psycopg2
import pgcopy
def paginate(seq, page_size=100):
i = iter(seq)
while True:
page = tuple(itertools.islice(i, 0, page_size))
if len(page):
yield page
else:
return
def executemany2(cur, sql, argslist, page_size=100):
for page in paginate(argslist, page_size=page_size):
sqls = (cur.mogrify(sql, args) for args in page)
cur.execute(";".join(sqls))
def executemany3(cur, sql, template, argslist, page_size=100):
for page in paginate(argslist, page_size=page_size):
insert_batch(cur, sql, template, page)
def insert_batch(cur, sql, template, args):
argslist = list(args)
sql_full = sql + ','.join([template] * len(argslist))
cur.execute(sql_full, reduce(operator.add, argslist))
def data(count):
return ((i, "x" * (i % 200)) for i in xrange(count))
@contextlib.contextmanager
def test(conn, method, count):
with conn.cursor() as cur:
cur.execute("""
CREATE TEMPORARY TABLE testmany
(id serial primary key, num integer, data text)
""")
conn.commit()
t0 = time.time()
yield cur
t1 = time.time()
cur.execute("SELECT count(*) FROM testmany")
assert cur.fetchone()[0] == count
cur.execute("DROP TABLE testmany")
cur.close()
print "%s: %s sec" % (method, (t1 - t0))
if __name__ == '__main__':
NRECS = int(sys.argv[1])
conn = psycopg2.connect(os.environ.get("TEST_DSN", ""))
with test(conn, 'classic', NRECS) as cur:
cur.executemany("insert into testmany (num, data) values (%s, %s)", data(NRECS))
with test(conn, 'joined', NRECS) as cur:
executemany2(cur, "insert into testmany (num, data) values (%s, %s)", data(NRECS))
with test(conn, 'folded', NRECS) as cur:
executemany3(cur, "insert into testmany (num, data) values ", "(%s, %s)", data(NRECS))
with test(conn, 'pgcopy', NRECS) as cur:
mgr = pgcopy.CopyManager(conn, 'testmany', ('num', 'data'))
mgr.copy(data(NRECS))
import contextlib
import itertools
import operator
import os
import sys
import time
import psycopg2
import pgcopy
def paginate(seq, page_size=100):
i = iter(seq)
while True:
page = tuple(itertools.islice(i, 0, page_size))
if len(page):
yield page
else:
return
def executemany2(cur, sql, argslist, page_size=100):
for page in paginate(argslist, page_size=page_size):
sqls = (cur.mogrify(sql, args) for args in page)
cur.execute(";".join(sqls))
def data_for_insert(count):
return ((i, 13, "x" * (1 + i % 200)) for i in xrange(count))
def data_for_update(count):
return ((i, 2 * i, "y" * (1 + i % 190)) for i in xrange(count))
@contextlib.contextmanager
def test(conn, method, count):
with conn.cursor() as cur:
cur.execute("""
CREATE TEMPORARY TABLE testmany
(id integer primary key, num integer, data text)
""")
conn.commit()
mgr = pgcopy.CopyManager(conn, 'testmany', ('id', 'num', 'data'))
mgr.copy(data_for_insert(count))
cur.execute("""
SELECT count(*) FROM testmany
WHERE num = 13
AND data LIKE '%x%'
""")
assert cur.fetchone()[0] == count
t0 = time.time()
yield cur
t1 = time.time()
cur.execute("""
SELECT count(*) FROM testmany
WHERE num = id * 2
AND data LIKE '%y%'
""")
assert cur.fetchone()[0] == count
cur.execute("DROP TABLE testmany")
cur.close()
conn.commit()
print "%s: %s sec" % (method, (t1 - t0))
if __name__ == '__main__':
NRECS = int(sys.argv[1])
conn = psycopg2.connect(os.environ.get("TEST_DSN", ""))
with test(conn, 'classic', NRECS) as cur:
rotate = operator.itemgetter(1, 2, 0)
params = itertools.imap(rotate, data_for_update(NRECS))
cur.executemany("""
UPDATE testmany
SET num=%s, data=%s
WHERE id=%s
""", params)
with test(conn, 'joined', NRECS) as cur:
rotate = operator.itemgetter(1, 2, 0)
params = itertools.imap(rotate, data_for_update(NRECS))
executemany2(cur, """
UPDATE testmany
SET num=%s, data=%s
WHERE id=%s
""", params)
with test(conn, 'prepared/joined', NRECS) as cur:
cur.execute("""
PREPARE doitupdate (int, int, text) AS
UPDATE testmany
SET num=$2, data=$3
WHERE id=$1
""")
sql = "EXECUTE doitupdate(%s, %s, %s)"
executemany2(cur, sql, data_for_update(NRECS))
with test(conn, 'pgcopy', NRECS) as cur:
cur.execute("""
CREATE TEMP TABLE testmany_update_temp
(LIKE testmany)
""")
conn.commit()
mgr = pgcopy.CopyManager(conn, 'testmany_update_temp', ('id', 'num', 'data'))
mgr.copy(data_for_update(NRECS))
cur.execute("""
UPDATE testmany t
SET num = s.num, data = s.data
FROM testmany_update_temp s
WHERE t.id = s.id
""")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment