Skip to content

Instantly share code, notes, and snippets.

@icedraco
Last active July 17, 2022 16:46
Show Gist options
  • Save icedraco/83028ecc2f9cd03274b002cdf453b3f5 to your computer and use it in GitHub Desktop.
Save icedraco/83028ecc2f9cd03274b002cdf453b3f5 to your computer and use it in GitHub Desktop.
Multiprocessing example
import time
import multiprocessing as mp
def main():
# how many processes should we spawn?
num_parallel_procs = 12
# queue that will pass work to each worker
q = mp.Queue(num_parallel_procs)
# create/init process objects to spawn; provides our q to each worker
print(f"Creating {num_parallel_procs} workers...")
procs = [
mp.Process(target=proc_do_stuff, name=f'worker-{i}', args=(q,), daemon=True)
for i
in range(num_parallel_procs)
]
# each Process must be started - do it now
[p.start() for p in procs]
# this will take a while; make sure we handle CTRL+C gracefully
try:
print("Workers started - sending workloads...")
for i in range(100):
# note: this call is blocking - it will pause the main process for
# as long as the queue remains full! KeyboardInterrupt will
# interrupt the paused state, see below.
q.put(f"Workload #{i}")
# send out X termination commands (one for each worker)
# so that when workers go through all the assignments,
# their next "assignment" would be to shut down.
print(f"Sending termination commands to {num_parallel_procs} workers...")
[q.put(None) for _ in procs]
except KeyboardInterrupt:
print("Main process caught CTRL+C: telling workers to wrap up earlier!")
# this will interrupt the loop above - not all workloads will be sent
finally:
# If we just caught CTRL+C, DON'T send shutdown command to workers!
# This is because each worker also gets a CTRL+C and will shut down
# before even seeing that command!
# This also means that you risk having the main process block again
# trying to send Nones into a full queue that hasn't been (and will never be)
# fully processed!
pass
print("Waiting for all workers to shut down...")
[p.join() for p in procs]
print("ALL DONE!")
return 0
def proc_do_stuff(q_input):
"""
This function will be run by each process we spawn with mp.Process(...).start()
:param mp.Queue q_input: input queue through which the MainProcess sends us work
"""
my_name = mp.current_process().name
# capture CTRL+C from stdin
try:
print(f"{my_name}: Started; processing work...")
while True:
# get the next workload from the queue
# this call blocks (pauses the process) for as long as the queue
# is empty
workload = q_input.get()
# check if we are asked to shut down
if workload is None:
break
# process workload
print(f"{my_name}: Processing workload {workload}")
time.sleep(0.8) # let's say we're calculating something here
# done; we can dump the result to a file, or a q_output, if one
# was provided, and let a "sink" process handle all the outputs.
print(f"{my_name}: Done processing {workload}!")
except KeyboardInterrupt:
print(f"{my_name}: CAUGHT CTRL+C!")
finally:
print(f"{my_name}: Shutting down...")
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment