Skip to content

Instantly share code, notes, and snippets.

@pwtail
Last active December 2, 2022 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pwtail/e39fb4a6ff6afbb66f082dd053f811bd to your computer and use it in GitHub Desktop.
Save pwtail/e39fb4a6ff6afbb66f082dd053f811bd to your computer and use it in GitHub Desktop.
Pg benchmark (inserting)
import asyncio
import concurrent
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
import sys
import random
import time
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
table = """
CREATE TABLE customer (
id SERIAL NOT NULL,
name VARCHAR(255),
description VARCHAR(255),
q INTEGER,
p INTEGER,
x INTEGER,
y INTEGER,
z INTEGER,
PRIMARY KEY (id)
)
"""
drop = "DROP TABLE IF EXISTS customer"
ids = list(range(10_000))
tests = 200
data = [
dict(
id=i,
name="c%d" % i,
description="c%d" % i,
q=i * 10,
p=i * 20,
x=i * 30,
y=i * 40,
)
for i in ids
]
insert = """
INSERT INTO customer (id, name, description, q, p, x, y) VALUES
(%(id)s, %(name)s, %(description)s, %(q)s, %(p)s, %(x)s, %(y)s)
"""
query_ = """
INSERT INTO customer (name, description, q, p, x, y) VALUES
('id', %(id)s, 10, 20, 30, 40)
"""
url = "postgresql://postgres:postgres@localhost/test"
@contextmanager
def time_log(message):
start = time.monotonic()
result = {'time': None}
yield result
end = time.monotonic()
result['time'] = end-start
print(f"Run {message} in {end-start} s")
def run_psycopg2():
print("Running psycopg2")
import psycopg2
with psycopg2.connect(url) as conn:
with conn.cursor() as cursor:
cursor.execute(drop)
cursor.execute(table)
# cursor.executemany(insert, data)
conn.commit()
def run(_i):
print(f"Insert done. Running {tests} queries")
to_query = random.choices(ids, k=tests)
with psycopg2.connect(url) as conn:
with time_log("psycopg2") as res:
for id_ in to_query:
with conn.cursor() as cursor:
cursor.execute(query_, {"id": id_})
# cursor.fetchall()
conn.commit()
return res['time']
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(run, range(5))
print(f'Results: {list(results)}')
with psycopg2.connect(url) as conn:
with conn.cursor() as cursor:
cursor.execute(drop)
conn.commit()
async def run_asyncpg():
print("Running asyncpg")
import asyncpg
places = dict(
id="$1", name="$2", description="$3", q="$4", p="$5", x="$6", y="$7"
)
a_insert = insert % places
a_select = query_ % {"id": "$1"}
conn: asyncpg.Connection = await asyncpg.connect(url)
async with conn.transaction():
await conn.execute(drop)
await conn.execute(table)
# await conn.executemany(a_insert, [tuple(d.values()) for d in data])
await conn.close()
async def run():
print(f"Insert done. Running {tests} queries")
to_query = random.choices(ids, k=tests)
conn: asyncpg.Connection = await asyncpg.connect(url)
with time_log("asyncpg") as res:
for id_ in to_query:
async with conn.transaction():
await conn.execute(a_select, str(id_))
await conn.close()
return res['time']
tasks = [run() for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
conn: asyncpg.Connection = await asyncpg.connect(url)
async with conn.transaction():
await conn.execute(drop)
await conn.close()
for name in sys.argv[1:]:
if name == "psycopg2":
run_psycopg2()
elif name == "psycopg":
run_psycopg()
elif name == "psycopg_async":
if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(run_psycopg_async())
elif name == "asyncpg":
asyncio.run(run_asyncpg())
else:
print("unknown driver", sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment