Skip to content

Instantly share code, notes, and snippets.

@serihiro
Last active May 31, 2019 05:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save serihiro/6374d1cc32268b87b5840efafde12aa8 to your computer and use it in GitHub Desktop.
Save serihiro/6374d1cc32268b87b5840efafde12aa8 to your computer and use it in GitHub Desktop.
python mulitprocessing.Queue test for each Context. See also https://gist.github.com/serihiro/bb9614b816b9c35e95ec4b53d3382f72
import multiprocessing
import queue
import argparse
import time
def task1(out_queue):
for i in range(0, 5):
out_queue.put(i)
def task2(terminate, in_queue, out_queue):
while not terminate.is_set():
while True:
try:
value = in_queue.get(timeout=0.01)
except queue.Empty:
if terminate.is_set():
return
else:
break
value += 1
while True:
try:
out_queue.put(value)
except queue.Full:
if terminate.is_set():
return
else:
break
def task3(terminate, in_queue, out_queue):
while not terminate.is_set():
while True:
try:
value = in_queue.get(timeout=0.01)
except queue.Empty:
if terminate.is_set():
return
else:
break
value *= 100
while True:
try:
out_queue.put(value)
except queue.Full:
if terminate.is_set():
return
pass
else:
break
def main():
queue1 = multiprocessing.Queue(100)
queue2 = multiprocessing.Queue(100)
queue3 = multiprocessing.Queue(100)
terminate = multiprocessing.Event()
p1 = multiprocessing.Process(target=task1, args=[queue1])
p2 = multiprocessing.Process(target=task2, args=[terminate, queue1, queue2])
p3 = multiprocessing.Process(target=task3, args=[terminate, queue2, queue3])
p1.start()
p2.start()
p3.start()
time.sleep(0.5)
terminate.set()
p1.join()
p2.join()
p3.join()
for _ in range(0, 5):
print(queue3.get(timeout=1))
# 100
# 200
# 300
# 400
# 500
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--start_method", type=str, default="fork")
args = parser.parse_args()
multiprocessing.set_start_method(args.start_method)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment