Skip to content

Instantly share code, notes, and snippets.

@yiminglin-ai
Last active September 27, 2021 16:01
Show Gist options
  • Save yiminglin-ai/167421711b9437308fe8a604872a52ea to your computer and use it in GitHub Desktop.
Save yiminglin-ai/167421711b9437308fe8a604872a52ea to your computer and use it in GitHub Desktop.
[parallelism] python scripts on threading and mutiprocessing
"""
tips from Jie:
1) Put worker_proc in a py file. The code may not work properly if you define it directly in your notebook. On the other hand, dispenser_proc can be defined in the notebook.
2) Use multiprocessing.Queue, not queue.Queue.
3) It's essential to have a job dispenser thread. Multiprocessing queue has a finite capacity. If you send jobs from your main thread, your code could hang.
If dispensing jobs in the main thread, it may hang. Reason: workers try to put stuff into the result queue, result queue becomes full, the main thread is not reading from result queue because it still tries to put stuff into the job queue, the workers hang such that they won’t read from the job queue, both workers are main thread hang
4) I use None to indicate 'end of queue'. It's important for the worker_proc to put None back into the job_queue once it gets a None, otherwise other workers may hang.
"""
import time
from multiprocessing import Queue, Process
from threading import Thread
import torch
from tqdm import tqdm
# Function to be ran in your worker process
config = {}
job_list = list(range(32))
NUM_GPUS = 2 # dont use torch.cuda.device_count(), or it will cause RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
PROC_PER_GPU = 2
JOB_BATCH = NUM_GPUS * PROC_PER_GPU
def worker_proc(gpu_queue, job_queue, result_queue):
gpu = gpu_queue.get()
while True:
job = job_queue.get()
if job is None:
job_queue.put(None)
break
else:
# Do the job
result = f"job {job} running of gpu: {gpu}"
time.sleep(0.5)
conv = torch.nn.Conv2d(3, 64, 3).cuda(gpu)
result = conv(torch.randn(4, 3, 224, 224).cuda(gpu)).detach().cpu()
# DO NOT USE return, use result_queue instead
result_queue.put(result)
# A thread for dispensing job to different gpus
def dispenser_proc(job_list, gpu_queue, job_queue):
for job in job_list:
job_queue.put(job)
for gpu in range(NUM_GPUS):
for _ in range(PROC_PER_GPU):
gpu_queue.put(gpu)
job_queue.put(None)
if __name__ == "__main__":
# In your main thread, start workers and the the job dispenser
job_queue = Queue()
result_queue = Queue()
gpu_queue = Queue()
workers = [
Process(target=worker_proc, args=(gpu_queue, job_queue, result_queue))
for _ in range(NUM_GPUS * PROC_PER_GPU)
]
for worker in workers:
worker.start()
dispenser = Thread(target=dispenser_proc, args=(job_list, gpu_queue, job_queue))
dispenser.start()
# Aggregate results
result_list = []
for _ in tqdm(job_list, total=len(job_list)):
result_list.append(result_queue.get())
# Make sure the workers and dispeners stop properly
for worker in workers:
worker.join()
dispenser.join()
print(len(result_list))
for r in result_list:
print(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment