Skip to content

Instantly share code, notes, and snippets.


sooop/ Secret

Last active Jun 19, 2020
What would you like to do?
import asyncio
from random import random
_port = 7770
# asyncio socket server/client
async def run_client(host: str, port: int):
# 서버와의 연결을 생성합니다.
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
reader, writer = await asyncio.open_connection(host, port)
# show connection info
print("[C] connected")
# 루프를 돌면서 입력받은 내용을 서버로 보내고,
# 응답을 받으면 출력합니다.
while True:
line = input("[C] enter message: ")
if not line:
# 입력받은 내용을 서버로 전송
payload = line.encode()
await writer.drain()
print(f"[C] sent: {len(payload)} bytes.\n")
# 서버로부터 받은 응답을 표시
data = await # type: bytes
print(f"[C] received: {len(data)} bytes")
print(f"[C] message: {data.decode()}")
# 연결을 종료합니다.
print("[C] closing connection...")
await writer.wait_closed()
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
while True:
# 클라이언트가 보낸 내용을 받기
data: bytes = await
# 받은 내용을 출력하고,
# 가공한 내용을 다시 내보내기
peername = writer.get_extra_info('peername')
print(f"[S] received: {len(data)} bytes from {peername}")
mes = data.decode()
print(f"[S] message: {mes}")
res = mes.upper()[::-1]
await asyncio.sleep(random() * 2)
await writer.drain()
async def run_server():
# 서버를 생성하고 실행
server = await asyncio.start_server(handler, host="", port=_port)
async with server:
# serve_forever()를 호출해야 클라이언트와 연결을 수락합니다.
await server.serve_forever()
async def main():
await asyncio.wait([run_server(), run_client("", _port)])
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment