Skip to content

Instantly share code, notes, and snippets.

@secemp9
Last active February 6, 2024 13:18
Show Gist options
  • Save secemp9/aa4aa0fcdbcf7c9b3e61fb608edd9937 to your computer and use it in GitHub Desktop.
Save secemp9/aa4aa0fcdbcf7c9b3e61fb608edd9937 to your computer and use it in GitHub Desktop.
Threading wait POC 4
import threading
import time
import signal
class ControlledExecution:
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.shutdown = False
def worker(self):
print("Worker started. Press Ctrl+C to stop.")
for i in range(10): # Simulated task
with self.lock:
if self.shutdown:
print("Worker stopping early.")
return # Ensure clean exit
print(f"Processing item {i}...")
time.sleep(2) # Simulate time-consuming task
print("Worker finished processing all items.")
def main_loop(self):
# Set initial signal handler for Ctrl+C
signal.signal(signal.SIGINT, self.handle_ctrl_c)
thread = threading.Thread(target=self.worker)
thread.start()
try:
while not self.shutdown:
time.sleep(1) # Wait for shutdown signal with minimal CPU usage
except KeyboardInterrupt:
pass # Just in case, but handle_ctrl_c should catch the first Ctrl+C
thread.join()
def signal_shutdown(self):
with self.lock:
self.shutdown = True
with self.condition:
self.condition.notify_all() # Wake up waiting threads
def handle_ctrl_c(self, signum, frame):
print("Shutdown signal received, shutting down gracefully...")
self.signal_shutdown()
# Ignore further SIGINT signals after the first one
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
controlled_execution = ControlledExecution()
controlled_execution.main_loop()
print("Main program exited gracefully.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment