Skip to content

Instantly share code, notes, and snippets.

@vcokltfre
Created February 21, 2022 19:09
Show Gist options
  • Save vcokltfre/69bef173a015d08a44e93fd4cbdaadd8 to your computer and use it in GitHub Desktop.
Save vcokltfre/69bef173a015d08a44e93fd4cbdaadd8 to your computer and use it in GitHub Desktop.
from asyncio import Event
from typing import TypeVar, Generic
T = TypeVar("T")
class Unset:
pass
class DataEvent(Event, Generic[T]):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.data: T | Unset = Unset()
async def wait(self) -> T:
await super().wait()
if isinstance(self.data, Unset):
raise ValueError("DataEvent data is not set at end of event wait.")
return self.data
def set(self, data: T) -> None:
self.data = data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment