Skip to content

Instantly share code, notes, and snippets.

@sorz
Created May 5, 2023 07:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sorz/8b0fb618a4f60ce5a4e83171d4087318 to your computer and use it in GitHub Desktop.
Save sorz/8b0fb618a4f60ce5a4e83171d4087318 to your computer and use it in GitHub Desktop.
Simple TCP server that echo everything with prepended HTTP response headers.
#!/usr/bin/env python3
import asyncio
from asyncio import StreamReader, StreamWriter
LISTEN = ('::', 8080)
WAIT_SECS = 3
BUF_SIZE = 4096
HTTP_RESPONSE = [
'HTTP/1.1 200 OK',
'Server: http_echo',
'Connection: Closed',
'Content-Type: text/plain'
]
async def handle_client(reader: StreamReader, writer: StreamWriter):
peer = writer.get_extra_info('peername')
print(f'Accept {peer}')
head = '\r\n'.join(HTTP_RESPONSE) + '\r\n\r\n'
writer.write(head.encode())
await writer.drain()
try:
while not reader.at_eof():
data = await asyncio.wait_for(reader.read(BUF_SIZE), WAIT_SECS)
if data:
writer.write(data)
await writer.drain()
except TimeoutError:
pass
writer.write_eof()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_client, *LISTEN)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Listen on {addrs}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment