Skip to content

Instantly share code, notes, and snippets.

@GtTmy
Last active November 3, 2023 14:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GtTmy/723bcc93326dee23e6dc957b6a3465a9 to your computer and use it in GitHub Desktop.
Save GtTmy/723bcc93326dee23e6dc957b6a3465a9 to your computer and use it in GitHub Desktop.
ncで話すための対向
import asyncio
import aioconsole
from typing import cast
async def print_received_data(reader: asyncio.StreamReader):
while True:
# data = await reader.read()
# try:
# data = await reader.readuntil(b'\n')
# except asyncio.IncompleteReadError as e:
# data = e.partial
data = await reader.readline() # readlineはEOFか改行までを読み込む。readuntilを使う必要はない
await aioconsole.aprint(data.decode(), end="")
async def send_data(writer: asyncio.StreamWriter):
while True:
data = cast(str, await aioconsole.ainput()) + "\n"
writer.write(data.encode())
await writer.drain()
async def handle_packet(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
await asyncio.gather(
print_received_data(reader),
send_data(writer)
)
async def main():
server = await asyncio.start_server(handle_packet, '127.0.0.1', 8888)
print(f"Start server")
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment