Skip to content

Instantly share code, notes, and snippets.

@rhoboro
Last active December 18, 2023 09:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rhoboro/64ab3863562032028c7a6e6e14a043e8 to your computer and use it in GitHub Desktop.
Save rhoboro/64ab3863562032028c7a6e6e14a043e8 to your computer and use it in GitHub Desktop.
Translate callback style API to async/await style API
import asyncio
from udp import DatagramReader, DatagramWriter, create_udp_client
async def listen_forever(writer: DatagramWriter, reader: DatagramReader) -> None:
while not writer.is_closing():
data = await reader.recv(2048)
print("Received:", data.decode())
async def main() -> None:
reader, writer = await create_udp_client("127.0.0.1", 9999)
listen_task = asyncio.create_task(listen_forever(writer, reader))
writer.sendto(b"Hello World!")
await asyncio.sleep(0.1)
writer.close()
try:
await listen_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from asyncio import AbstractEventLoop, DatagramTransport, DatagramProtocol, Future
from typing import Self, cast
class DatagramReaderProtocol(DatagramProtocol):
def __init__(self, reader: "DatagramReader", loop: AbstractEventLoop) -> None:
self._loop = loop
self._reader = reader
self._transport = None
def connection_made(self, transport):
self._transport = transport
def datagram_received(self, data, addr):
self._reader.feed_data(data)
def connection_lost(self, exc):
print("Connection closed")
self._reader = None
self._transport = None
class DatagramReader:
def __init__(self, loop: AbstractEventLoop):
self._loop = loop
self._waiter: Future | None = None
self._buffer = bytearray()
def feed_data(self, data: bytes) -> None:
self._buffer.extend(data)
self._wakeup_waiter()
async def _wait_for_data(self) -> None:
self._waiter = self._loop.create_future()
try:
await self._waiter
finally:
self._waiter = None
def _wakeup_waiter(self) -> None:
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(True)
async def recv(self, n: int) -> bytes:
if self._buffer:
data = bytes(memoryview(self._buffer)[:n])
del self._buffer[:n]
return data
await self._wait_for_data()
data = bytes(memoryview(self._buffer)[:n])
del self._buffer[:n]
return data
def close(self) -> None:
print("Close the socket")
waiter = self._waiter
if waiter is not None:
self._waiter = None
waiter.cancel()
self._buffer.clear()
class DatagramWriter:
def __init__(
self,
transport: DatagramTransport,
protocol: DatagramReaderProtocol,
reader: DatagramReader,
loop: AbstractEventLoop,
):
self._transport = transport
self._protocol = protocol
self._reader = reader
self._loop = loop
def sendto(self, data: bytes) -> None:
self._transport.sendto(data)
def close(self) -> None:
self._reader.close()
return self._transport.close()
def is_closing(self) -> bool:
return self._transport.is_closing()
async def create_udp_client(host: str, port: int) -> tuple[DatagramReader, DatagramWriter]:
loop = asyncio.get_running_loop()
reader = DatagramReader(loop)
protocol = DatagramReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_datagram_endpoint(lambda: protocol, remote_addr=(host, port))
writer = DatagramWriter(cast(DatagramTransport, transport), protocol, reader, loop)
return reader, writer
@rhoboro
Copy link
Author

rhoboro commented Dec 16, 2023

This gist is an implementation equivalent to the UDP Echo Client in the official documentation.
https://docs.python.org/3/library/asyncio-protocol.html#udp-echo-client

This is based on asyncio.open_connection implementation.
https://github.com/python/cpython/blob/3.12/Lib/asyncio/streams.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment