Created
March 14, 2024 01:16
-
-
Save jainal09/8067cd488fbd8f90fabef7f8cb0264f7 to your computer and use it in GitHub Desktop.
Grace Full Shutdown Async Python code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import signal | |
import random | |
import os | |
# Shared queue | |
queue = asyncio.Queue() | |
# Event to signal the producer to stop | |
stop_event = asyncio.Event() | |
async def producer(): | |
while not stop_event.is_set(): | |
# Simulate adding an event to the queue | |
event = "Event" | |
await queue.put(event) | |
print(f"Produced: {event}") | |
await asyncio.sleep(1) # Simulate some delay | |
async def heavy_task(task_id): | |
# Simulate a heavy CPU-bound task | |
print(f"Heavy task {task_id} started") | |
await asyncio.sleep(5) # Simulate processing time | |
print(f"Heavy task {task_id} finished") | |
async def consumer(consumer_id): | |
while True: | |
if stop_event.is_set() and queue.empty(): | |
print(f"Consumer {consumer_id} exiting") | |
break | |
else: | |
# print queue length | |
print(f"Queue length: {queue.qsize()}") | |
event = await queue.get() # This will wait until an item is available | |
print(f"Consumer {consumer_id} consumed: {event}") | |
# Simulate heavy task processing | |
await heavy_task(consumer_id) | |
queue.task_done() # Mark the task as done | |
def signal_handler(sig, frame): | |
signal_name = signal.Signals(sig).name | |
print(f"⚡⚡⚡⚡⚡⚡ Signal caught: {signal_name} ⚡⚡⚡⚡⚡") | |
stop_event.set() | |
# Set the signal handler | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
async def main(): | |
# Start the producer | |
producer_task = asyncio.create_task(producer()) | |
# Start two consumers | |
consumer_tasks = [asyncio.create_task(consumer(i)) for i in range(2)] | |
# Wait for the producer to stop | |
await stop_event.wait() | |
# Wait for all consumers to finish processing the remaining tasks | |
await asyncio.gather(*consumer_tasks) | |
print("All tasks processed. Exiting.") | |
if __name__ == "__main__": | |
print(f"The process id: {os.getpid()}") | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment