This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from .soft_cancel_scope import SoftCancelScope | |
async def main(): | |
async def do_stuff(num): | |
print(f'starting job {num} at t={trio.current_time()-START_TIME}') | |
await trio.sleep(1) | |
print(f'finished job {num} at t={trio.current_time()-START_TIME}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cattr | |
class Cattrs: | |
"""A pydantic compatible field class that uses `cattrs` to structure data """ | |
def __hash__(self): | |
return hash((type(self), self._model)) | |
def __eq__(self, other): | |
return hash(self) == hash(other) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from .trio_loop import TrioLoop | |
async def foo(data): | |
await trio.sleep(1) | |
return data | |
with TrioLoop() as loop: | |
data = loop.wait(foo('hello world')) | |
print(data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .lifetime import Lifetime | |
from contextlib import contextmanager | |
@contextmanager | |
def merge_cancellation(): | |
try: | |
yield | |
except trio.MultiError as wrapper: | |
errors = wrapper.exceptions | |