Skip to content

Instantly share code, notes, and snippets.

@jainal09
Created March 14, 2024 01:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jainal09/8067cd488fbd8f90fabef7f8cb0264f7 to your computer and use it in GitHub Desktop.
Save jainal09/8067cd488fbd8f90fabef7f8cb0264f7 to your computer and use it in GitHub Desktop.
Grace Full Shutdown Async Python code
import asyncio
import signal
import random
import os
# Shared queue
queue = asyncio.Queue()
# Event to signal the producer to stop
stop_event = asyncio.Event()
async def producer():
while not stop_event.is_set():
# Simulate adding an event to the queue
event = "Event"
await queue.put(event)
print(f"Produced: {event}")
await asyncio.sleep(1) # Simulate some delay
async def heavy_task(task_id):
# Simulate a heavy CPU-bound task
print(f"Heavy task {task_id} started")
await asyncio.sleep(5) # Simulate processing time
print(f"Heavy task {task_id} finished")
async def consumer(consumer_id):
while True:
if stop_event.is_set() and queue.empty():
print(f"Consumer {consumer_id} exiting")
break
else:
# print queue length
print(f"Queue length: {queue.qsize()}")
event = await queue.get() # This will wait until an item is available
print(f"Consumer {consumer_id} consumed: {event}")
# Simulate heavy task processing
await heavy_task(consumer_id)
queue.task_done() # Mark the task as done
def signal_handler(sig, frame):
signal_name = signal.Signals(sig).name
print(f"⚡⚡⚡⚡⚡⚡ Signal caught: {signal_name} ⚡⚡⚡⚡⚡")
stop_event.set()
# Set the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
# Start the producer
producer_task = asyncio.create_task(producer())
# Start two consumers
consumer_tasks = [asyncio.create_task(consumer(i)) for i in range(2)]
# Wait for the producer to stop
await stop_event.wait()
# Wait for all consumers to finish processing the remaining tasks
await asyncio.gather(*consumer_tasks)
print("All tasks processed. Exiting.")
if __name__ == "__main__":
print(f"The process id: {os.getpid()}")
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment