Skip to content

Instantly share code, notes, and snippets.

@iklobato
Created May 9, 2024 18:22
Show Gist options
  • Save iklobato/b3233373eddff71f757ff698270cb712 to your computer and use it in GitHub Desktop.
Save iklobato/b3233373eddff71f757ff698270cb712 to your computer and use it in GitHub Desktop.
This script demonstrates the use of multiprocessing processes (`Writer`, `Drawer`, and `Assembler`) with synchronization using `Condition` and `Event` objects to execute sequential tasks in a concurrent environment.
import logging
from multiprocessing import Process, Condition, Event
import time
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s (%(levelname)s): [%(processName)s] (%(funcName)s): %(message)s",
datefmt="%H:%M:%S",
)
class Writer(Process):
def __init__(self, condition, event):
super().__init__(name="Writer")
self.condition = condition
self.event = event
def run(self):
with self:
logging.info("Writing a part of the project...")
time.sleep(2)
logging.info("Finished writing.")
with self.condition:
self.condition.notify_all()
self.event.set() # Signal that the writer has finished
def __enter__(self):
logging.info("Starting work.")
return self
def __exit__(self, exc_type, exc_value, traceback):
logging.info("Work completed.")
class Drawer(Process):
def __init__(self, condition, event):
super().__init__(name="Drawer")
self.condition = condition
self.event = event
def run(self):
with self:
self.event.wait() # Wait for the writer to finish
with self.condition:
logging.info("Writer is done! Now starting to draw...")
time.sleep(2)
logging.info("Finished drawing.")
self.condition.notify_all()
def __enter__(self):
logging.info("Starting work.")
return self
def __exit__(self, exc_type, exc_value, traceback):
logging.info("Work completed.")
class Assembler(Process):
def __init__(self, condition, event):
super().__init__(name="Assembler")
self.condition = condition
self.event = event
def run(self):
with self:
self.event.wait() # Ensure the writer and drawer have finished
with self.condition:
self.condition.wait() # Ensure drawer is done
logging.info("Drawer is done! Assembling...")
time.sleep(2)
logging.info("Finished assembling.")
def __enter__(self):
logging.info("Starting work.")
return self
def __exit__(self, exc_type, exc_value, traceback):
logging.info("Work completed.")
if __name__ == '__main__':
condition = Condition()
event = Event()
writer_process = Writer(condition, event)
drawer_process = Drawer(condition, event)
assembler_process = Assembler(condition, event)
writer_process.start()
drawer_process.start()
assembler_process.start()
writer_process.join()
drawer_process.join()
assembler_process.join()
logging.info("!")
"""
2024-05-09 15:15:45 (INFO): [Writer] (__enter__): Starting work.
2024-05-09 15:15:45 (INFO): [Writer] (run): Writing a part of the project...
2024-05-09 15:15:45 (INFO): [Drawer] (__enter__): Starting work.
2024-05-09 15:15:45 (INFO): [Assembler] (__enter__): Starting work.
2024-05-09 15:15:47 (INFO): [Writer] (run): Finished writing.
2024-05-09 15:15:47 (INFO): [Writer] (__exit__): Work completed.
2024-05-09 15:15:47 (INFO): [Drawer] (run): Writer is done! Now starting to draw...
2024-05-09 15:15:49 (INFO): [Drawer] (run): Finished drawing.
2024-05-09 15:15:49 (INFO): [Drawer] (__exit__): Work completed.
2024-05-09 15:15:55 (INFO): [Assembler] (__exit__): Work completed.
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment