Created
April 2, 2024 11:47
-
-
Save onjin/89f5dfcaf24d8116528404aa17e1a8ba to your computer and use it in GitHub Desktop.
supervised async python workers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import argparse | |
import random | |
from typing import Any | |
async def worker(name: str, queue: asyncio.Queue[str], stop_event: asyncio.Event): | |
while not stop_event.is_set() or not queue.empty(): | |
try: | |
# Try to get an item from the queue without indefinitely blocking | |
try: | |
item = await asyncio.wait_for(queue.get(), timeout=1) | |
queue.task_done() | |
except asyncio.TimeoutError: | |
continue # Check stop_event | |
print(f"{name} processing {item}") | |
if random.randint(0, 10) > 8: # Simulate random failure | |
raise Exception(f"{name} encountered an error with {item}") | |
await asyncio.sleep(random.random()) # Simulate work | |
except Exception as e: | |
print(f"Error in {name}: {e}") | |
# In case of error, we simply log it. The loop will continue. | |
async def maintain_worker_pool(queue: asyncio.Queue[str], stop_event: asyncio.Event, num_workers: int): | |
workers: list[asyncio.Task[Any]] = [] | |
for i in range(num_workers): | |
worker_name = f"Worker {i+1}" | |
task = asyncio.create_task(worker(worker_name, queue, stop_event)) | |
workers.append(task) | |
while not stop_event.is_set() or not all(task.done() for task in workers): | |
await asyncio.sleep(1) # Regularly check the state of workers | |
# Restart failed workers | |
for i, task in enumerate(workers): | |
if task.done() and not stop_event.is_set(): # Check if a worker has finished unexpectedly | |
worker_name = f"Worker {i+1} (restarted)" | |
print(f"{worker_name} is being restarted.") | |
new_task = asyncio.create_task(worker(worker_name, queue, stop_event)) | |
workers[i] = new_task | |
async def main(args: argparse.Namespace) -> None: | |
queue: asyncio.Queue[str] = asyncio.Queue() | |
stop_event = asyncio.Event() | |
num_workers = args.number_of_workers | |
# Enqueue some tasks | |
for i in range(args.queue_size): | |
await queue.put(f"Item {i+1}") | |
# Start the supervisor task to maintain worker pool and handle worker failures | |
supervisor_task = asyncio.create_task(maintain_worker_pool(queue, stop_event, num_workers)) | |
# Simulate work | |
await queue.join() | |
stop_event.set() # Signal to stop | |
# Wait for the supervisor to finish | |
await supervisor_task | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-w', '--number-of-workers', type=int, default=3) | |
parser.add_argument('-q', '--queue-size', type=int, default=50) | |
args = parser.parse_args() | |
asyncio.run(main(args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment