Last active
July 18, 2022 16:09
-
-
Save StevenCHowell/c40c8879b71ee9979231a40d6fda1cbe to your computer and use it in GitHub Desktop.
Demo performing parallel tasks with a queue for logging progress.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Demo performing parallel tasks with a queue for logging progress.""" | |
import datetime | |
import multiprocessing as mp | |
import time | |
from collections import namedtuple | |
from xml.dom import minidom | |
import psutil | |
from lxml import etree | |
N_CORES = psutil.cpu_count(logical=False) | |
Msg = namedtuple('Msg', ['value', 'event', 'time']) | |
def worker(val, wait, queue): | |
"""Simulates a long running process""" | |
msg_start = Msg(val, 'start', datetime.datetime.now().isoformat()) | |
queue.put(msg_start) | |
# time.sleep(random.random() * 5) | |
time.sleep(wait) | |
result = val ** 2 | |
msg_stop = Msg(val, 'stop', datetime.datetime.now().isoformat()) | |
queue.put(msg_stop) | |
return result | |
def find_run(value, log): | |
"""Find a specific run in the log file.""" | |
runs = log.findall('run') | |
run = None | |
for run in runs: | |
if str(value) == run.find('valueIn').text: | |
break | |
return run | |
def listener(queue, log_str): | |
"""Listen for messages on the queue and write updates to the XML log.""" | |
log = etree.fromstring(log_str) | |
fname = log.find('fname').text | |
while True: | |
msg = queue.get() | |
run = find_run(msg.value, log) | |
calculation = run.find('calculation') | |
status = log.find('sessionStatus') | |
status.attrib['lastChanged'] = str(msg.time) | |
with open(fname, 'w') as f: | |
if msg.event == 'start': | |
start_time = calculation.find('startTime') | |
start_time.text = msg.time | |
elif msg.event == 'stop': | |
stop_time = calculation.find('stopTime') | |
stop_time.text = msg.time | |
elif msg.event == 'complete': | |
status.text = 'Complete' | |
f.write(pretty_string(log)) | |
break | |
f.write(pretty_string(log)) | |
def create_xml_log(work, fname): | |
"""Create the XML log for the task queue.""" | |
log = etree.Element('multiProcessingLog') | |
now = datetime.datetime.now() | |
session_time = etree.SubElement(log, 'sessionTime') | |
session_time.text = str(now.isoformat()) | |
fname_field = etree.SubElement(log, 'fname') | |
fname_field.text = str(fname) | |
status = etree.SubElement(log, 'sessionStatus', | |
attrib={'lastChanged': str(now.isoformat())}) | |
status.text = 'Incomplete' | |
for work_data in work: | |
value = work_data[0] | |
run = etree.SubElement(log, 'run') | |
value_in = etree.SubElement(run, 'valueIn') | |
value_in.text = str(value) | |
calculation = etree.SubElement(run, 'calculation') | |
start = etree.SubElement(calculation, 'startTime') | |
start.text = 'NA' | |
stop = etree.SubElement(calculation, 'stopTime') | |
stop.text = 'NA' | |
return log | |
def pretty_string(elem): | |
"""Pretty-ify the XML log string.""" | |
raw = etree.tostring(elem, encoding='unicode') | |
pretty = minidom.parseString(raw).toprettyxml(indent=' ') | |
return pretty | |
def main(): | |
"""Setup and execute the worker and log processes.""" | |
work = ( | |
[1, 5], | |
[2, 20], | |
[3, 22], | |
[4, 18], | |
[5, 9], | |
[6, 2], | |
[7, 8], | |
[8, 1], | |
[9, 0.5], | |
[10, 0.1], | |
[11, 3], | |
[12, 6], | |
[13, 14], | |
[14, 4], | |
[15, 13], | |
[16, 0.5], | |
[17, 0.8], | |
[18, 16], | |
[19, 0.02], | |
[20, 0.01], | |
[21, 0.009], | |
[22, 0.008], | |
) | |
fname_xml = datetime.datetime.now().strftime('%y%m%d_%H%M%S_mp_log.xml') | |
log = create_xml_log(work, fname_xml) | |
# must use Manager queue here, or will not work | |
manager = mp.Manager() | |
queue = manager.Queue() | |
# queue_status = manager.Queue() | |
with mp.Pool(processes=N_CORES) as pool: | |
# put listener to work first | |
log_str = etree.tostring(log) | |
pool.apply_async(listener, (queue, log_str)) | |
# fire off workers | |
jobs = [] | |
job_dict = {} | |
for val, wait in work: | |
job = pool.apply_async(worker, (val, wait, queue)) | |
jobs.append(job) | |
job_dict[val] = job | |
while job_dict: | |
for val in list(job_dict): | |
if job_dict[val].ready(): | |
del job_dict[val] | |
print(f'job {val} finished') | |
time.sleep(1) | |
for job in jobs: | |
job.get() | |
# now we are done, kill the listener | |
msg_done = Msg('nan', 'complete', datetime.datetime.now().isoformat()) | |
queue.put(msg_done) | |
if __name__ == "__main__": | |
main() | |
print(r'\m/ >.< \m/') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment