Skip to content

Instantly share code, notes, and snippets.

@XoseLluis
Created September 19, 2022 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save XoseLluis/586b1f38527404a1f3147bccaba8e345 to your computer and use it in GitHub Desktop.
Save XoseLluis/586b1f38527404a1f3147bccaba8e345 to your computer and use it in GitHub Desktop.
AsyncExecutor class to limit the number of python awaitables running at a given time
import asyncio
from dataclasses import dataclass
import random
from typing import Any
import json
@dataclass
class AsyncAction:
future: asyncio.Future
awaitable_fn: Any # function that returns an awaitable (coroutine, a task...)
args: list[Any]
kwargs: list[Any]
class AsyncExecutor:
def __init__(self, event_loop: asyncio.AbstractEventLoop, max_running_actions: int):
self.event_loop = event_loop
self.max_running_actions = max_running_actions
self.running_counter = 0
self.not_launched_actions = []
def submit(self, awaitable_fn, *args, **kwargs) -> asyncio.Future:
"""
receives a function to be executed when there's one available slot. That function returns and awaitable
"""
future = self.event_loop.create_future()
action = AsyncAction(future, awaitable_fn, args, kwargs)
if self.running_counter < self.max_running_actions:
self.running_counter += 1
# _run_action returns a coroutine, so if I'm not awaiting it need to run it as a task
#self._run_action(action)
asyncio.create_task(self._run_action(action))
else:
self.not_launched_actions.append(action)
return future
async def _run_action(self, action: AsyncAction):
# self.running_counter += 1
result = await action.awaitable_fn(*(action.args), **(action.kwargs))
self._process_result(action, result)
def _process_result(self, action: AsyncAction, result: Any):
self.running_counter -= 1
action.future.set_result(result)
if len(self.not_launched_actions):
self.running_counter += 1
asyncio.create_task(self._run_action(self.not_launched_actions.pop(0)))
async def mock_download(url: str, delay: int):
print("starting mock download")
await asyncio.sleep(delay)
return url.upper()
def create_download_task(url: str, delay: int):
print(create_download_task.__name__)
return asyncio.get_running_loop().create_task(mock_download(url, delay))
async def main():
async_executor = AsyncExecutor(asyncio.get_running_loop(), 4)
futures = []
for i in range(0,10):
delay = random.randint(1, 4)
if i % 2 == 0:
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
else:
future = async_executor.submit(create_download_task, f"www.jesoutienslapolice.fr/post_{i}", delay)
future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
futures.append(future)
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
futures.append(future)
print(f"{len(futures)} submitted")
results = await asyncio.gather(*futures)
print(f"all finished: {json.dumps(results, indent=4)}")
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment