This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import atexit | |
from contextlib import ExitStack | |
from anyio.from_thread import start_blocking_portal | |
exit_stack = ExitStack() | |
portal = exit_stack.enter_context(start_blocking_portal()) | |
atexit.register(exit_stack.close) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import time | |
import anyio | |
from anyio import create_tcp_listener | |
from anyio.abc import SocketAttribute, SocketStream | |
async def handle_connection(stream: SocketStream): | |
with stream: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
path = Path("MARKER-ö.txt") | |
path.touch() | |
path2 = next(p for p in Path.cwd().iterdir() if p.name.startswith("MARKER-")) | |
path2.chmod(0o777) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
__all__ = ["connect", "Connection", "Cursor"] | |
import sqlite3 | |
from collections.abc import Callable, Sequence | |
from functools import partial, wraps | |
from typing import Any | |
from anyio import CapacityLimiter, to_thread |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio import run, sleep | |
from asyncpg import InterfaceError | |
from sqlalchemy.ext.asyncio import create_async_engine | |
async def main(): | |
engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb") | |
while True: | |
while True: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class _BlockingAsyncContextManager(AbstractContextManager): | |
_enter_future: Future | |
_exit_future: Future | |
_exit_event: Event | |
_exit_exc_info: Tuple[Optional[Type[BaseException]], Optional[BaseException], | |
Optional[TracebackType]] | |
def __init__(self, async_cm: AsyncContextManager[T_co], portal: 'BlockingPortal'): | |
self._async_cm = async_cm | |
self._portal = portal |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import as_completed | |
from anyio import start_blocking_portal, sleep | |
async def long_running_task(index): | |
await sleep(1) | |
print(f'Task {index} running...') | |
await sleep(index) | |
return f'Task {index} return value' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test.py:13: note: Revealed type is 'Any' | |
test.py:13: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:15: note: Revealed type is 'Any' | |
test.py:15: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:17: note: Revealed type is 'Any' | |
test.py:17: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:20: error: No overload variant of "foo" matches argument type "float" | |
test.py:20: note: Possible overload variants: | |
test.py:20: note: def foo(bar: Union[bytes, str]) -> str | |
test.py:20: note: def foo(bar: int) -> int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TypedAttribute(Generic[T_Attr]): | |
""" | |
Generic class used to define typed attributes, for use in :class:`~TypedAttributeContainer`. | |
""" | |
class TypedAttributeContainer(metaclass=ABCMeta): | |
""" | |
Base class for classes that wish to provide typed extra attributes. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socket | |
class DatagramProtocol(asyncio.DatagramProtocol): | |
def error_received(self, exc: Exception) -> None: | |
print('received error:', exc.__class__, ':', exc) | |
async def main(): |
NewerOlder