Skip to content

Instantly share code, notes, and snippets.

@abersheeran
Last active March 24, 2020 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abersheeran/50fbac99b297636fc6a7a613d4fb8497 to your computer and use it in GitHub Desktop.
Save abersheeran/50fbac99b297636fc6a7a613d4fb8497 to your computer and use it in GitHub Desktop.
get UDP address by asyncio
import sys
import asyncio
import typing
import logging
import signal
from socket import AF_INET, AF_INET6, inet_pton
try:
import uvloop
uvloop.install()
except ImportError:
pass
logger = logging.getLogger("echo")
class UDPEchoProtocol(asyncio.DatagramProtocol):
def __init__(self, address_family: int) -> None:
self.transport: asyncio.DatagramTransport = None
self.address_family = address_family
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""
udp open
"""
self.transport = transport
logger.info(
f"UDP Echo Server serving on {transport.get_extra_info('sockname')}."
)
def connection_lost(self, exc) -> None:
"""
udp closed
"""
logger.info(
f"UDP Echo Server {self.transport.get_extra_info('sockname')} closed."
)
def datagram_received(self, data: bytes, address: typing.Tuple[str, int]) -> None:
logger.info(f"Request from {address}")
self.transport.sendto(
inet_pton(self.address_family, address[0]) + address[1].to_bytes(2, "big"),
address,
)
def error_received(self, exc: Exception):
logger.warning(f"UDP Endpoint received an error: {exc}")
async def main() -> typing.NoReturn:
loop = asyncio.get_event_loop()
ipv4_transport, _ = await loop.create_datagram_endpoint(
lambda: UDPEchoProtocol(AF_INET),
("0.0.0.0", 1024),
family=AF_INET,
reuse_address=True,
)
ipv6_transport, _ = await loop.create_datagram_endpoint(
lambda: UDPEchoProtocol(AF_INET6),
("::", 1024),
family=AF_INET6,
reuse_address=True,
)
should_running = True
def exit(s, t) -> None:
nonlocal should_running
should_running = False
signal.signal(signal.SIGINT, exit)
signal.signal(signal.SIGTERM, exit)
while should_running:
await asyncio.sleep(1)
ipv6_transport.close()
ipv4_transport.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment