Skip to content

Instantly share code, notes, and snippets.

@onjin
Created April 2, 2024 11:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onjin/89f5dfcaf24d8116528404aa17e1a8ba to your computer and use it in GitHub Desktop.
Save onjin/89f5dfcaf24d8116528404aa17e1a8ba to your computer and use it in GitHub Desktop.
supervised async python workers
import asyncio
import argparse
import random
from typing import Any
async def worker(name: str, queue: asyncio.Queue[str], stop_event: asyncio.Event):
while not stop_event.is_set() or not queue.empty():
try:
# Try to get an item from the queue without indefinitely blocking
try:
item = await asyncio.wait_for(queue.get(), timeout=1)
queue.task_done()
except asyncio.TimeoutError:
continue # Check stop_event
print(f"{name} processing {item}")
if random.randint(0, 10) > 8: # Simulate random failure
raise Exception(f"{name} encountered an error with {item}")
await asyncio.sleep(random.random()) # Simulate work
except Exception as e:
print(f"Error in {name}: {e}")
# In case of error, we simply log it. The loop will continue.
async def maintain_worker_pool(queue: asyncio.Queue[str], stop_event: asyncio.Event, num_workers: int):
workers: list[asyncio.Task[Any]] = []
for i in range(num_workers):
worker_name = f"Worker {i+1}"
task = asyncio.create_task(worker(worker_name, queue, stop_event))
workers.append(task)
while not stop_event.is_set() or not all(task.done() for task in workers):
await asyncio.sleep(1) # Regularly check the state of workers
# Restart failed workers
for i, task in enumerate(workers):
if task.done() and not stop_event.is_set(): # Check if a worker has finished unexpectedly
worker_name = f"Worker {i+1} (restarted)"
print(f"{worker_name} is being restarted.")
new_task = asyncio.create_task(worker(worker_name, queue, stop_event))
workers[i] = new_task
async def main(args: argparse.Namespace) -> None:
queue: asyncio.Queue[str] = asyncio.Queue()
stop_event = asyncio.Event()
num_workers = args.number_of_workers
# Enqueue some tasks
for i in range(args.queue_size):
await queue.put(f"Item {i+1}")
# Start the supervisor task to maintain worker pool and handle worker failures
supervisor_task = asyncio.create_task(maintain_worker_pool(queue, stop_event, num_workers))
# Simulate work
await queue.join()
stop_event.set() # Signal to stop
# Wait for the supervisor to finish
await supervisor_task
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-w', '--number-of-workers', type=int, default=3)
parser.add_argument('-q', '--queue-size', type=int, default=50)
args = parser.parse_args()
asyncio.run(main(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment