-
-
Save altaurog/84668e034646fb354b5de81bb86a580d to your computer and use it in GitHub Desktop.
Test for a different implementation of psycopg executemany
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import itertools | |
import operator | |
import os | |
import sys | |
import time | |
import psycopg2 | |
import pgcopy | |
def paginate(seq, page_size=100): | |
i = iter(seq) | |
while True: | |
page = tuple(itertools.islice(i, 0, page_size)) | |
if len(page): | |
yield page | |
else: | |
return | |
def executemany2(cur, sql, argslist, page_size=100): | |
for page in paginate(argslist, page_size=page_size): | |
sqls = (cur.mogrify(sql, args) for args in page) | |
cur.execute(";".join(sqls)) | |
def executemany3(cur, sql, template, argslist, page_size=100): | |
for page in paginate(argslist, page_size=page_size): | |
insert_batch(cur, sql, template, page) | |
def insert_batch(cur, sql, template, args): | |
argslist = list(args) | |
sql_full = sql + ','.join([template] * len(argslist)) | |
cur.execute(sql_full, reduce(operator.add, argslist)) | |
def data(count): | |
return ((i, "x" * (i % 200)) for i in xrange(count)) | |
@contextlib.contextmanager | |
def test(conn, method, count): | |
with conn.cursor() as cur: | |
cur.execute(""" | |
CREATE TEMPORARY TABLE testmany | |
(id serial primary key, num integer, data text) | |
""") | |
conn.commit() | |
t0 = time.time() | |
yield cur | |
t1 = time.time() | |
cur.execute("SELECT count(*) FROM testmany") | |
assert cur.fetchone()[0] == count | |
cur.execute("DROP TABLE testmany") | |
cur.close() | |
print "%s: %s sec" % (method, (t1 - t0)) | |
if __name__ == '__main__': | |
NRECS = int(sys.argv[1]) | |
conn = psycopg2.connect(os.environ.get("TEST_DSN", "")) | |
with test(conn, 'classic', NRECS) as cur: | |
cur.executemany("insert into testmany (num, data) values (%s, %s)", data(NRECS)) | |
with test(conn, 'joined', NRECS) as cur: | |
executemany2(cur, "insert into testmany (num, data) values (%s, %s)", data(NRECS)) | |
with test(conn, 'folded', NRECS) as cur: | |
executemany3(cur, "insert into testmany (num, data) values ", "(%s, %s)", data(NRECS)) | |
with test(conn, 'pgcopy', NRECS) as cur: | |
mgr = pgcopy.CopyManager(conn, 'testmany', ('num', 'data')) | |
mgr.copy(data(NRECS)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import itertools | |
import operator | |
import os | |
import sys | |
import time | |
import psycopg2 | |
import pgcopy | |
def paginate(seq, page_size=100): | |
i = iter(seq) | |
while True: | |
page = tuple(itertools.islice(i, 0, page_size)) | |
if len(page): | |
yield page | |
else: | |
return | |
def executemany2(cur, sql, argslist, page_size=100): | |
for page in paginate(argslist, page_size=page_size): | |
sqls = (cur.mogrify(sql, args) for args in page) | |
cur.execute(";".join(sqls)) | |
def data_for_insert(count): | |
return ((i, 13, "x" * (1 + i % 200)) for i in xrange(count)) | |
def data_for_update(count): | |
return ((i, 2 * i, "y" * (1 + i % 190)) for i in xrange(count)) | |
@contextlib.contextmanager | |
def test(conn, method, count): | |
with conn.cursor() as cur: | |
cur.execute(""" | |
CREATE TEMPORARY TABLE testmany | |
(id integer primary key, num integer, data text) | |
""") | |
conn.commit() | |
mgr = pgcopy.CopyManager(conn, 'testmany', ('id', 'num', 'data')) | |
mgr.copy(data_for_insert(count)) | |
cur.execute(""" | |
SELECT count(*) FROM testmany | |
WHERE num = 13 | |
AND data LIKE '%x%' | |
""") | |
assert cur.fetchone()[0] == count | |
t0 = time.time() | |
yield cur | |
t1 = time.time() | |
cur.execute(""" | |
SELECT count(*) FROM testmany | |
WHERE num = id * 2 | |
AND data LIKE '%y%' | |
""") | |
assert cur.fetchone()[0] == count | |
cur.execute("DROP TABLE testmany") | |
cur.close() | |
conn.commit() | |
print "%s: %s sec" % (method, (t1 - t0)) | |
if __name__ == '__main__': | |
NRECS = int(sys.argv[1]) | |
conn = psycopg2.connect(os.environ.get("TEST_DSN", "")) | |
with test(conn, 'classic', NRECS) as cur: | |
rotate = operator.itemgetter(1, 2, 0) | |
params = itertools.imap(rotate, data_for_update(NRECS)) | |
cur.executemany(""" | |
UPDATE testmany | |
SET num=%s, data=%s | |
WHERE id=%s | |
""", params) | |
with test(conn, 'joined', NRECS) as cur: | |
rotate = operator.itemgetter(1, 2, 0) | |
params = itertools.imap(rotate, data_for_update(NRECS)) | |
executemany2(cur, """ | |
UPDATE testmany | |
SET num=%s, data=%s | |
WHERE id=%s | |
""", params) | |
with test(conn, 'prepared/joined', NRECS) as cur: | |
cur.execute(""" | |
PREPARE doitupdate (int, int, text) AS | |
UPDATE testmany | |
SET num=$2, data=$3 | |
WHERE id=$1 | |
""") | |
sql = "EXECUTE doitupdate(%s, %s, %s)" | |
executemany2(cur, sql, data_for_update(NRECS)) | |
with test(conn, 'pgcopy', NRECS) as cur: | |
cur.execute(""" | |
CREATE TEMP TABLE testmany_update_temp | |
(LIKE testmany) | |
""") | |
conn.commit() | |
mgr = pgcopy.CopyManager(conn, 'testmany_update_temp', ('id', 'num', 'data')) | |
mgr.copy(data_for_update(NRECS)) | |
cur.execute(""" | |
UPDATE testmany t | |
SET num = s.num, data = s.data | |
FROM testmany_update_temp s | |
WHERE t.id = s.id | |
""") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment