Skip to content

Instantly share code, notes, and snippets.

@jgarvin
Last active December 15, 2023 01:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgarvin/abdec3b42fb6f3532185cd747b308802 to your computer and use it in GitHub Desktop.
Save jgarvin/abdec3b42fb6f3532185cd747b308802 to your computer and use it in GitHub Desktop.
Randomize asyncio awaits
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import random
import logging as log # noqa
from typing import TypeVar, Generator
import pytest
_T = TypeVar("_T")
class RandomizedFuture(asyncio.Future[_T]):
def __init__(self, *, loop: asyncio.AbstractEventLoop | None = None) -> None:
super().__init__(loop=loop or asyncio.get_event_loop())
self._actual_result: _T | None = None
self._result_set = False
self._result_really_set = False
def set_result(self, result: _T) -> None:
if self._result_set or self.done():
# Future already has a result set or is cancelled, raise an exception
raise asyncio.InvalidStateError("The future already has a result or is cancelled")
self._actual_result = result
self._result_set = True
self._randomly_set_or_delay_result()
def _randomly_set_or_delay_result(self) -> None:
if not self._result_really_set:
if random.choice([True, False]):
self._really_set_result()
else:
# Schedule another attempt with a slight delay, we use
# a nonzero delay b/c we are worried the event loop
# will push the call on the list it is currently
# iterating through and pick it up right away without
# actually giving anything else a chance to run.
self.get_loop().call_later(random.random()*0.01, self._randomly_set_or_delay_result)
def _really_set_result(self) -> None:
self._result_really_set = True
# We can't assert this b/c sometimes _T itself actually is None.
# So instead we disable the typecheck on `super().set_result()`
#
# assert self._actual_result is not None
super().set_result(self._actual_result) # type: ignore
def set_exception(self, exception: type | BaseException) -> None:
if self._result_set or self.done():
raise asyncio.InvalidStateError("The future already has a result or is cancelled")
# Directly set the exception, as we don't need to randomize this
self._result_really_set = True
super().set_exception(exception)
def done(self) -> bool:
# Override 'done' to consider if the result is set
return super().done() or self._result_set
def result(self) -> _T:
# Force setting the actual result if it's pending
if not super().done() and self._result_set:
self._really_set_result()
return super().result()
def exception(self) -> None | BaseException:
# Force setting the actual result if it's pending
if not super().done() and self._result_set:
self._really_set_result()
return super().exception()
class RandomizedEventLoop(asyncio.SelectorEventLoop):
def _run_once(self) -> None:
# Shuffle the scheduled callbacks
# not sure why mypy doesn't think _scheduled exists...
random.shuffle(self._scheduled) # type: ignore
super()._run_once() # type: ignore
def create_future(self) -> RandomizedFuture[_T]:
# Override to return an instance of RandomizedFuture
return RandomizedFuture(loop=self)
# Fixture to use the custom event loop, to use in a test do:
#
# from random_event_loop import event_loop
#
# pytest automatically recognizes the name "event_loop" being defined
# at the top level of the test module
@pytest.fixture(scope="function")
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
log.info("Using random loop")
loop = RandomizedEventLoop()
yield loop
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment