Skip to content

Instantly share code, notes, and snippets.

@utdrmac
Last active September 27, 2023 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save utdrmac/6516ebc87090a9295baf98dd93564baf to your computer and use it in GitHub Desktop.
Save utdrmac/6516ebc87090a9295baf98dd93564baf to your computer and use it in GitHub Desktop.
Execute queries in parallel on MySQL; Producer-Consumer in Python
import sys
import argparse
import mysql.connector
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
def parseargs():
parser = argparse.ArgumentParser(prog="mysql_queries_prod_consumer.py", add_help=False)
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument("-h", dest="dbHost")
parser.add_argument("-u", dest="dbUser")
parser.add_argument("-p", dest="dbPass")
parser.add_argument("-t", dest="numWorkers", type=int, default=4)
parser.add_argument("queriesFile", help="Filename containing SQL queries to execute")
args = parser.parse_args()
return args
def consumer(queue, thd, argv):
dbHost = argv.dbHost
dbUser = argv.dbUser
dbPass = argv.dbPass
print(f"[{thd}] Connecting to MySQL {dbUser}@{dbHost}")
mydb = mysql.connector.connect(host=dbHost, user=dbUser, password=dbPass, autocommit=True)
cursor = mydb.cursor(dictionary=True)
print(f"[{thd}] Connected")
while not queue.empty():
try:
item = queue.get()
print(f"[{thd}] Executing '{item[:50]}'")
cursor.execute(item)
except Exception as e:
print(f"!! [{thd}] Exception - {e}\n\n'{item}'")
queue.task_done()
print(f"-- Tasks Remaining: {queue.qsize()}")
def main():
args = parseargs()
print(f"{args}")
numWorkers = args.numWorkers
queriesFile = args.queriesFile
print(f"-- Queries File: {queriesFile}")
print(f"-- DBH: {args.dbUser}@{args.dbHost}")
print(f"-- Thds: {args.numWorkers}")
q = Queue()
# This the 'producer'. Read queries from a file and add them to the queue
print("## Adding SQL to queue")
with open(queriesFile, 'r') as f:
for line in f:
q.put(line.strip())
qsize = q.qsize()
if qsize < 1:
print("No queries added to queue")
sys.exit(1)
else:
print(f"{qsize} queries added to queue")
# Launch 8 consumer threads
print("## Launching threads")
with ThreadPoolExecutor(max_workers=numWorkers) as executor:
for i in range(numWorkers):
print(f"### Thread {i} starting")
executor.submit(consumer, q, i, args)
# Wait for all threads to finish
q.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment