Created
July 31, 2023 20:43
-
-
Save DailyDreaming/813b5b2dd2e058e3a280b4ccc754d811 to your computer and use it in GitHub Desktop.
Script to query UCSC SQL database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2006-2022 Mark Diekhans | |
# Modified by Lon Blauvelt 2023 | |
"""Operations for accessing mysql""" | |
import warnings | |
import time | |
import MySQLdb # mysqlclient is required for python 3 | |
from MySQLdb.cursors import DictCursor # noqa: F401 | |
from MySQLdb.cursors import Cursor | |
import MySQLdb.converters | |
_mySqlErrorOnWarnDone = False | |
def mySqlSetErrorOnWarn(): | |
"""Turn most warnings into errors except for those that are Notes from | |
`drop .. if exists'. This only adds warnings the firs time its called""" | |
# the drop warnings could also be disabled with a set command. | |
global _mySqlErrorOnWarnDone | |
if not _mySqlErrorOnWarnDone: | |
warnings.filterwarnings('error', category=MySQLdb.Warning) | |
warnings.filterwarnings("ignore", message="Unknown table '.*'") | |
warnings.filterwarnings("ignore", message="Can't drop database '.*'; database doesn't exist") | |
warnings.filterwarnings("ignore", message="PY_SSIZE_T_CLEAN will be required for '#' formats") | |
_mySqlErrorOnWarnDone = True | |
def connect(*, host=None, port=None, user=None, passwd=None, db=None, cursorclass=None, | |
conv=None): | |
"""Connect to genome mysql server, using explict parameters. | |
Use cursorclass=mysqlOps.DictCursor to get dictionary results | |
""" | |
if not _mySqlErrorOnWarnDone: | |
mySqlSetErrorOnWarn() | |
kwargs = {} | |
loc = locals() | |
for arg in ("host", "port", "user", "passwd", "db", "cursorclass", "conv"): | |
if loc[arg] is not None: | |
kwargs[arg] = loc[arg] | |
return MySQLdb.Connect(**kwargs) | |
def execute(conn, sql, args=None): | |
"execute SQL query on a connection that returns no result" | |
cur = conn.cursor() | |
try: | |
cur.execute(sql, args) | |
finally: | |
cur.close() | |
def query(conn, sql, args=None, *, cursorclass=None): | |
"generator to run an SQL query on a connection" | |
cur = conn.cursor(cursorclass=cursorclass) | |
try: | |
cur.execute(sql, args) | |
for row in cur: | |
yield row | |
finally: | |
cur.close() | |
def getTablesLike(conn, pattern, db=None): | |
frm = "" if db is None else "from " + db | |
sql = "show tables {} like \"{}\"".format(frm, pattern) | |
cur = conn.cursor(cursorclass=Cursor) | |
try: | |
cur.execute(sql) | |
return [row[0] for row in cur] | |
finally: | |
cur.close() | |
def haveTablesLike(conn, pattern, db=None): | |
return len(getTablesLike(conn, pattern, db)) > 0 | |
def time_sql(): | |
"""Connect to the main UCSC SQL server and print out timing.""" | |
start_real_time = time.time() | |
start_cpu_time = time.process_time() | |
conn = connect(host='genome-mysql.soe.ucsc.edu', user='genome', db='hg19') | |
conn.close() | |
end_real_time = time.time() | |
end_cpu_time = time.process_time() | |
elapsed_real_time = end_real_time - start_real_time | |
elapsed_cpu_time = end_cpu_time - start_cpu_time | |
print(f"Elapsed real time: {elapsed_real_time} seconds") | |
print(f"Elapsed CPU time: {elapsed_cpu_time} seconds\n") | |
for i in range(500): | |
time_sql() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment