Skip to content

Instantly share code, notes, and snippets.

@gustabot42
Last active June 9, 2024 05:29
Show Gist options
  • Save gustabot42/bbd4a5427a59980d98bb20315ff49d69 to your computer and use it in GitHub Desktop.
Save gustabot42/bbd4a5427a59980d98bb20315ff49d69 to your computer and use it in GitHub Desktop.
Asyncio Event With Result
import asyncio
import collections
from asyncio import Event
from collections.abc import Awaitable
from collections.abc import Callable
from collections.abc import Iterable
from result import Err
from result import Result
class EventResult(Event):
"""Like asyncio.Event but with a result value."""
def __init__(self):
self._waiters = collections.deque()
self._value: Result = None
def __repr__(self):
res = super().__repr__()
extra = "set" if self._value is not None else "unset"
if self._waiters:
extra = f"{extra}, waiters:{len(self._waiters)}"
return f"<{res[1:-1]} [{extra}]>"
def is_set(self):
"""Return True if and only if the internal flag is not set."""
return self._value is not None
def set(self, value: Result):
"""Set the internal value. All coroutines waiting for it are awakened.
Coroutine that call wait() once value is set will not block at all.
"""
if self._value is None:
self._value = value
for fut in self._waiters:
if not fut.done():
fut.set_result(True)
def clear(self):
"""Reset the internal value to None. Subsequently, coroutines calling
wait() will block until set() is called to set the internal value again.
"""
self._value = None
async def wait(self):
"""Block until the internal value is set.
If the internal value is set on entry, return value immediately.
Otherwise, block until another coroutine calls set() then return value.
"""
if self._value is not None:
return self._value
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
return self._value
finally:
self._waiters.remove(fut)
async def wait_for(self, timeout: float | None = None) -> Result:
try:
await asyncio.wait_for(self.wait(), timeout=timeout)
except TimeoutError:
return Err("TimeoutError")
return self._value
import asyncio
from collections.abc import Callable
import pytest
from result import Err
from result import Ok
from hermes.utils.asyncio import EventResult
async def delay_callable(delay: float, func: Callable, *func_args, **func_kwargs):
await asyncio.sleep(delay)
return func(*func_args, **func_kwargs)
def test_is_set():
event = EventResult()
assert not event.is_set()
event.set(Ok("value"))
assert event.is_set()
def test_clear():
event = EventResult()
event.set(Ok("value"))
event.clear()
assert not event.is_set()
@pytest.mark.asyncio()
async def test_wait():
event = EventResult()
results = await asyncio.gather(delay_callable(0.1, event.set, Ok("value")), event.wait())
assert results[1] == Ok("value")
@pytest.mark.asyncio()
async def test_wait_for():
event = EventResult()
results = await asyncio.gather(delay_callable(0.1, event.set, Ok("value")), event.wait_for())
assert results[1] == Ok("value")
@pytest.mark.asyncio()
async def test_wait_for_timeout():
event = EventResult()
result = await event.wait_for(0.1)
assert result == Err("TimeoutError")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment