Skip to content

Instantly share code, notes, and snippets.

@StevenCHowell
Last active July 18, 2022 16:09
Show Gist options
  • Save StevenCHowell/c40c8879b71ee9979231a40d6fda1cbe to your computer and use it in GitHub Desktop.
Save StevenCHowell/c40c8879b71ee9979231a40d6fda1cbe to your computer and use it in GitHub Desktop.
Demo performing parallel tasks with a queue for logging progress.
"""Demo performing parallel tasks with a queue for logging progress."""
import datetime
import multiprocessing as mp
import time
from collections import namedtuple
from xml.dom import minidom
import psutil
from lxml import etree
N_CORES = psutil.cpu_count(logical=False)
Msg = namedtuple('Msg', ['value', 'event', 'time'])
def worker(val, wait, queue):
"""Simulates a long running process"""
msg_start = Msg(val, 'start', datetime.datetime.now().isoformat())
queue.put(msg_start)
# time.sleep(random.random() * 5)
time.sleep(wait)
result = val ** 2
msg_stop = Msg(val, 'stop', datetime.datetime.now().isoformat())
queue.put(msg_stop)
return result
def find_run(value, log):
"""Find a specific run in the log file."""
runs = log.findall('run')
run = None
for run in runs:
if str(value) == run.find('valueIn').text:
break
return run
def listener(queue, log_str):
"""Listen for messages on the queue and write updates to the XML log."""
log = etree.fromstring(log_str)
fname = log.find('fname').text
while True:
msg = queue.get()
run = find_run(msg.value, log)
calculation = run.find('calculation')
status = log.find('sessionStatus')
status.attrib['lastChanged'] = str(msg.time)
with open(fname, 'w') as f:
if msg.event == 'start':
start_time = calculation.find('startTime')
start_time.text = msg.time
elif msg.event == 'stop':
stop_time = calculation.find('stopTime')
stop_time.text = msg.time
elif msg.event == 'complete':
status.text = 'Complete'
f.write(pretty_string(log))
break
f.write(pretty_string(log))
def create_xml_log(work, fname):
"""Create the XML log for the task queue."""
log = etree.Element('multiProcessingLog')
now = datetime.datetime.now()
session_time = etree.SubElement(log, 'sessionTime')
session_time.text = str(now.isoformat())
fname_field = etree.SubElement(log, 'fname')
fname_field.text = str(fname)
status = etree.SubElement(log, 'sessionStatus',
attrib={'lastChanged': str(now.isoformat())})
status.text = 'Incomplete'
for work_data in work:
value = work_data[0]
run = etree.SubElement(log, 'run')
value_in = etree.SubElement(run, 'valueIn')
value_in.text = str(value)
calculation = etree.SubElement(run, 'calculation')
start = etree.SubElement(calculation, 'startTime')
start.text = 'NA'
stop = etree.SubElement(calculation, 'stopTime')
stop.text = 'NA'
return log
def pretty_string(elem):
"""Pretty-ify the XML log string."""
raw = etree.tostring(elem, encoding='unicode')
pretty = minidom.parseString(raw).toprettyxml(indent=' ')
return pretty
def main():
"""Setup and execute the worker and log processes."""
work = (
[1, 5],
[2, 20],
[3, 22],
[4, 18],
[5, 9],
[6, 2],
[7, 8],
[8, 1],
[9, 0.5],
[10, 0.1],
[11, 3],
[12, 6],
[13, 14],
[14, 4],
[15, 13],
[16, 0.5],
[17, 0.8],
[18, 16],
[19, 0.02],
[20, 0.01],
[21, 0.009],
[22, 0.008],
)
fname_xml = datetime.datetime.now().strftime('%y%m%d_%H%M%S_mp_log.xml')
log = create_xml_log(work, fname_xml)
# must use Manager queue here, or will not work
manager = mp.Manager()
queue = manager.Queue()
# queue_status = manager.Queue()
with mp.Pool(processes=N_CORES) as pool:
# put listener to work first
log_str = etree.tostring(log)
pool.apply_async(listener, (queue, log_str))
# fire off workers
jobs = []
job_dict = {}
for val, wait in work:
job = pool.apply_async(worker, (val, wait, queue))
jobs.append(job)
job_dict[val] = job
while job_dict:
for val in list(job_dict):
if job_dict[val].ready():
del job_dict[val]
print(f'job {val} finished')
time.sleep(1)
for job in jobs:
job.get()
# now we are done, kill the listener
msg_done = Msg('nan', 'complete', datetime.datetime.now().isoformat())
queue.put(msg_done)
if __name__ == "__main__":
main()
print(r'\m/ >.< \m/')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment