Skip to content

Instantly share code, notes, and snippets.

@zeheater
Last active February 25, 2021 12:48
Show Gist options
  • Save zeheater/3ec52dc0ae96bd328036f928a6c31615 to your computer and use it in GitHub Desktop.
Save zeheater/3ec52dc0ae96bd328036f928a6c31615 to your computer and use it in GitHub Desktop.
Python Async UDP Server
import sys
import asyncio
import concurrent.futures
from hexdump import hexdump
class DebugServer():
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
asyncio.get_running_loop().create_task(self.aprint(data))
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
async def aprint(self, data):
hexdump(data)
print('')
async def ainput(prompt: str) -> str:
await asyncio.get_running_loop().run_in_executor(None, lambda s=prompt: sys.stdout.write(s+''))
sys.stdout.flush()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await asyncio.get_running_loop().run_in_executor(pool, sys.stdin.readline)
return res.strip()
async def main():
loop = asyncio.get_event_loop()
transport, protocol = await loop.create_datagram_endpoint(lambda: DebugServer(), local_addr=('127.0.0.1', 56565))
isquit = False
while (not isquit):
inp = await ainput('Press q to exit\n')
isquit = (inp == 'q')
transport.close()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment