Created
March 22, 2021 16:39
-
-
Save andy-slac/b55d33c6dd359e699e9d5425cabd4f3c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
from contextlib import contextmanager | |
import datetime | |
import os | |
import time | |
import uuid | |
from sqlalchemy import create_engine, TypeDecorator, CHAR, MetaData, Table, Column, BigInteger, DateTime, Integer | |
from sqlalchemy.dialects.postgresql import UUID | |
from sqlalchemy.sql import functions, select | |
shortcuts = {"sqlite": "sqlite:///test.db", | |
"postgres": "postgresql+psycopg2://lsst-pg-prod1.ncsa.illinois.edu:5432/lsstdb1"} | |
uuid_namespace = uuid.UUID('e56b11b8-8905-11eb-ac8e-ecf4bbeb04a4') | |
class GUID(TypeDecorator): | |
"""Platform-independent GUID type. | |
Uses PostgreSQL's UUID type, otherwise uses | |
CHAR(32), storing as stringified hex values. | |
""" | |
impl = CHAR | |
def load_dialect_impl(self, dialect): | |
if dialect.name == 'postgresql': | |
return dialect.type_descriptor(UUID()) | |
else: | |
return dialect.type_descriptor(CHAR(32)) | |
def process_bind_param(self, value, dialect): | |
if value is None: | |
return value | |
elif dialect.name == 'postgresql': | |
return str(value) | |
else: | |
if not isinstance(value, uuid.UUID): | |
return "%.32x" % uuid.UUID(value).int | |
else: | |
# hexstring | |
return "%.32x" % value.int | |
def process_result_value(self, value, dialect): | |
if value is None: | |
return value | |
else: | |
if not isinstance(value, uuid.UUID): | |
value = uuid.UUID(value) | |
return value | |
def main(): | |
parser = argparse.ArgumentParser(description="Testing sqlalchemy RETURNING support") | |
parser.add_argument("backend", default="sqlite", | |
help="SQLAlchemy connection URL, or one of sqlite|postgres") | |
parser.add_argument("--drop", dest="drop", default=False, action="store_true", | |
help="Drop tables if they exists.") | |
parser.add_argument("--echo", dest="echo", default=0, action="store_const", const=1, | |
help="Set echo flag for sqlalchemy.") | |
parser.add_argument("-n", dest="count", default=1000, type=int, | |
help="Number of records to insert, def: %(default)s.") | |
parser.add_argument("-b", dest="batch", default=100, type=int, | |
help="Number of records in batch, def: %(default)s.") | |
parser.add_argument("-m", "--mode", dest="mode", default="site_id", | |
choices=["site_id", "uuid1", "uuid4", "uuid5"], | |
help="Testing mode, def: %(default)s.") | |
parser.add_argument("--metrics", dest="metrics", default=None, metavar="PATH", | |
help="File name for metrics, file is open in append mode.") | |
parser.add_argument("--tags", dest="tags", default=None, metavar="KEY=VAL", | |
help="Extra tags to add to each metrics.") | |
args = parser.parse_args() | |
mcoll = MetricsCollector(args.metrics, args.mode, args.tags) | |
engine = create_engine(shortcuts.get(args.backend, args.backend), echo=args.echo) | |
metadata = MetaData() | |
if args.mode == "site_id": | |
columns = [ | |
Column("id", BigInteger, primary_key=True), | |
Column("site_id", Integer, primary_key=True), | |
] | |
else: | |
columns = [ | |
Column("id", GUID, primary_key=True), | |
] | |
columns += [ | |
Column("dataset_type_id", BigInteger), | |
Column("ingest_date", DateTime), | |
Column("run_id", BigInteger), | |
] | |
table = Table("testuuid", metadata, *columns) | |
if args.drop: | |
metadata.drop_all(engine) | |
metadata.create_all(engine) | |
with engine.connect() as connection: | |
start_id = 0 | |
if args.mode == "site_id": | |
q = select([functions.count(table.columns.id)]) | |
res = connection.execute(q) | |
row = res.fetchone() | |
start_id = row[0] | |
query = table.insert() | |
rows = [] | |
for i in range(start_id, start_id + args.count): | |
data = dict( | |
dataset_type_id=100, | |
ingest_date=datetime.datetime.now(), | |
run_id=1000 | |
) | |
if args.mode == "site_id": | |
data["id"] = i | |
data["site_id"] = 42 | |
elif args.mode == "uuid1": | |
data["id"] = uuid.uuid1() | |
elif args.mode == "uuid4": | |
data["id"] = uuid.uuid4() | |
elif args.mode == "uuid5": | |
data["id"] = uuid.uuid5(uuid_namespace, f"id = {i}") | |
rows.append(data) | |
if len(rows) == args.batch: | |
with mcoll.insert(len(rows), i+1), connection.begin(): | |
connection.execute(query, rows) | |
rows = [] | |
if rows: | |
with mcoll.insert(len(rows), i+1), connection.begin(): | |
connection.execute(query, rows) | |
mcoll.close() | |
class MetricsCollector: | |
def __init__(self, path, mode, tags): | |
self.fd = None | |
if path: | |
self.fd = os.open(path, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o666) | |
self.tags = f"mode={mode}" | |
if tags: | |
self.tags += "," + tags | |
def close(self): | |
if self.fd is not None: | |
os.close(self.fd) | |
@contextmanager | |
def insert(self, n_rows, total_rows): | |
if self.fd is not None: | |
t0 = time.time() | |
yield | |
t1 = time.time() | |
ts = int(t1 * 1e9) | |
metr = f"insert_time,{self.tags} value={t1-t0},count={n_rows},total_count={total_rows} {ts}\n" | |
os.write(self.fd, metr.encode()) | |
else: | |
yield | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment