Skip to content

Instantly share code, notes, and snippets.

@MarcinKonowalczyk
Last active April 2, 2024 13:45
Show Gist options
  • Save MarcinKonowalczyk/bffd880bbdac34d1f8c5e7751a06a2a7 to your computer and use it in GitHub Desktop.
Save MarcinKonowalczyk/bffd880bbdac34d1f8c5e7751a06a2a7 to your computer and use it in GitHub Desktop.
Pingpong between two processes using a single JoinableQueue
from typing import cast
from multiprocessing import Manager, Process
from multiprocessing.queues import JoinableQueue
import time
class Palette(Process):
def __init__(
self,
queue: JoinableQueue,
name: str,
sleep_time: float = 1.0,
):
Process.__init__(self, name=name)
self.queue = queue
self.sleep_time = sleep_time
self.n_received = 0
def run(self) -> None:
while True:
item = self.queue.get(block=True, timeout=None)
print(f"{self.name} got '{item}' from the queue (total: {self.n_received})")
self.n_received += 1
self.queue.task_done() # signal that we've taken the item
time.sleep(self.sleep_time)
self.queue.put(f"hello from {self.name}")
self.queue.join() # wait for the queue to be empty
if __name__ == "__main__":
manager = Manager()
q = cast(JoinableQueue, manager.JoinableQueue()) # type: ignore
processes = [
Palette(q, name="Ping", sleep_time=0.01),
Palette(q, name="Pong", sleep_time=0.02),
# Palette(q, name="Blip", sleep_time=0.03),
# Palette(q, name="Blop", sleep_time=0.04),
]
[p.start() for p in processes] # type: ignore
q.put("hello from main")
try:
while True:
time.sleep(1)
# print("main")
except KeyboardInterrupt:
[p.terminate() for p in processes] # type: ignore
[p.join() for p in processes] # type: ignore
print("done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment