Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python
import argparse
from contextlib import contextmanager
import datetime
import os
import time
import uuid
from sqlalchemy import create_engine, TypeDecorator, CHAR, MetaData, Table, Column, BigInteger, DateTime, Integer
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import functions, select
shortcuts = {"sqlite": "sqlite:///test.db",
"postgres": "postgresql+psycopg2://lsst-pg-prod1.ncsa.illinois.edu:5432/lsstdb1"}
uuid_namespace = uuid.UUID('e56b11b8-8905-11eb-ac8e-ecf4bbeb04a4')
class GUID(TypeDecorator):
"""Platform-independent GUID type.
Uses PostgreSQL's UUID type, otherwise uses
CHAR(32), storing as stringified hex values.
"""
impl = CHAR
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value).int
else:
# hexstring
return "%.32x" % value.int
def process_result_value(self, value, dialect):
if value is None:
return value
else:
if not isinstance(value, uuid.UUID):
value = uuid.UUID(value)
return value
def main():
parser = argparse.ArgumentParser(description="Testing sqlalchemy RETURNING support")
parser.add_argument("backend", default="sqlite",
help="SQLAlchemy connection URL, or one of sqlite|postgres")
parser.add_argument("--drop", dest="drop", default=False, action="store_true",
help="Drop tables if they exists.")
parser.add_argument("--echo", dest="echo", default=0, action="store_const", const=1,
help="Set echo flag for sqlalchemy.")
parser.add_argument("-n", dest="count", default=1000, type=int,
help="Number of records to insert, def: %(default)s.")
parser.add_argument("-b", dest="batch", default=100, type=int,
help="Number of records in batch, def: %(default)s.")
parser.add_argument("-m", "--mode", dest="mode", default="site_id",
choices=["site_id", "uuid1", "uuid4", "uuid5"],
help="Testing mode, def: %(default)s.")
parser.add_argument("--metrics", dest="metrics", default=None, metavar="PATH",
help="File name for metrics, file is open in append mode.")
parser.add_argument("--tags", dest="tags", default=None, metavar="KEY=VAL",
help="Extra tags to add to each metrics.")
args = parser.parse_args()
mcoll = MetricsCollector(args.metrics, args.mode, args.tags)
engine = create_engine(shortcuts.get(args.backend, args.backend), echo=args.echo)
metadata = MetaData()
if args.mode == "site_id":
columns = [
Column("id", BigInteger, primary_key=True),
Column("site_id", Integer, primary_key=True),
]
else:
columns = [
Column("id", GUID, primary_key=True),
]
columns += [
Column("dataset_type_id", BigInteger),
Column("ingest_date", DateTime),
Column("run_id", BigInteger),
]
table = Table("testuuid", metadata, *columns)
if args.drop:
metadata.drop_all(engine)
metadata.create_all(engine)
with engine.connect() as connection:
start_id = 0
if args.mode == "site_id":
q = select([functions.count(table.columns.id)])
res = connection.execute(q)
row = res.fetchone()
start_id = row[0]
query = table.insert()
rows = []
for i in range(start_id, start_id + args.count):
data = dict(
dataset_type_id=100,
ingest_date=datetime.datetime.now(),
run_id=1000
)
if args.mode == "site_id":
data["id"] = i
data["site_id"] = 42
elif args.mode == "uuid1":
data["id"] = uuid.uuid1()
elif args.mode == "uuid4":
data["id"] = uuid.uuid4()
elif args.mode == "uuid5":
data["id"] = uuid.uuid5(uuid_namespace, f"id = {i}")
rows.append(data)
if len(rows) == args.batch:
with mcoll.insert(len(rows), i+1), connection.begin():
connection.execute(query, rows)
rows = []
if rows:
with mcoll.insert(len(rows), i+1), connection.begin():
connection.execute(query, rows)
mcoll.close()
class MetricsCollector:
def __init__(self, path, mode, tags):
self.fd = None
if path:
self.fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o666)
self.tags = f"mode={mode}"
if tags:
self.tags += "," + tags
def close(self):
if self.fd is not None:
os.close(self.fd)
@contextmanager
def insert(self, n_rows, total_rows):
if self.fd is not None:
t0 = time.time()
yield
t1 = time.time()
ts = int(t1 * 1e9)
metr = f"insert_time,{self.tags} value={t1-t0},count={n_rows},total_count={total_rows} {ts}\n"
os.write(self.fd, metr.encode())
else:
yield
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment