Skip to content

Instantly share code, notes, and snippets.

@leimao
Created August 30, 2020 19:57
Show Gist options
  • Save leimao/7d8f30c1832bab8c0dddcf0cd13362d4 to your computer and use it in GitHub Desktop.
Save leimao/7d8f30c1832bab8c0dddcf0cd13362d4 to your computer and use it in GitHub Desktop.
EdgeDB AsyncIO Tutorial ChatBot: https://youtu.be/SyiTd4rLb2s
from __future__ import annotations
from typing import IO
import asyncio
import sys
import contextlib
import aiofiles.threadpool
from chat_streams import split_lines, write, handle_writes
async def handle_reads(reader: asyncio.StreamReader) -> None:
async for message in split_lines(reader):
text = message.decode()
print(f"Received {text!r}")
if text == "quit\n":
break
async def stream_file_to_queue(file: IO[str], queue: asyncio.Queue[bytes]) -> None:
loop = asyncio.get_event_loop()
async for message in aiofiles.threadpool.wrap(file, loop=loop):
await queue.put(message.encode())
async def send_file(file: IO[str]) -> None:
write_queue: asyncio.Queue[bytes] = asyncio.Queue()
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
read_handler = asyncio.create_task(handle_reads(reader))
write_handler = asyncio.create_task(handle_writes(writer, write_queue))
copy_handler = asyncio.create_task(stream_file_to_queue(file, write_queue))
done, pending = await asyncio.wait([read_handler, write_handler, copy_handler], return_when=asyncio.FIRST_COMPLETED)
print("Closing the connection")
for task in pending:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
if __name__ == "__main__":
asyncio.run(send_file(sys.stdin))
from __future__ import annotations
import asyncio
import contextlib
from typing import Dict, Callable
from chat_streams import split_lines, handle_writes
users: Dict[str, asyncio.Queue[bytes]] = {}
async def handle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
queue: asyncio.Queue[bytes] = asyncio.Queue()
write_handler = asyncio.create_task(handle_writes(writer, queue))
ctx = {
"addr": str(writer.get_extra_info("peername")),
"my_nick": "",
}
try:
await handle_commands(reader, queue, ctx)
finally:
my_nick = ctx["my_nick"]
if my_nick in users:
del users[my_nick]
print("Closing the connection")
await queue.put(b"")
with contextlib.suppress(asyncio.CancelledError):
await write_handler
async def handle_commands(reader: asyncio.StreamReader, queue: asyncio.Queue[bytes], ctx: Dict[str, str]) -> None:
addr = ctx["addr"]
my_nick = ctx["my_nick"]
await queue.put(b"Welcome! Please introduce yourself. \n Format: I'm [username]")
async for message in split_lines(reader):
text = message.decode()
print(f"Received {text!r} from {addr!r}")
if text == "quit\n":
await queue.put(message)
break
if text.startswith("I'm "):
command, my_nick = text.split(" ", 1)
users[my_nick] = queue
elif text.startswith("@"):
if not my_nick:
await queue.put(b"Please introduce yourself.")
continue
at_nick, user_message = text.split(" ", 1)
nick = at_nick[1:]
if nick not in users:
await queue.put(b"Unknown user: " + nick.encode())
continue
user_message = f"<{my_nick}> {user_message}"
await users[nick].put(user_message.encode())
async def main() -> None:
server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888)
addr = server.sockets[0].getsockname() if server.sockets else "unknown"
print(f"Serving on {addr}")
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
from __future__ import annotations
from typing import AsyncIterator
import asyncio
import sys
async def split_lines(reader: asyncio.StreamReader) -> AsyncIterator[bytes]:
data = b""
try:
while data := data + await reader.read(100):
if b"\n" in data:
message, data = data.split(b"\n", 1)
yield message
except ConnectionResetError:
pass
if data:
yield data
async def write(writer: asyncio.StreamWriter, message: bytes) -> None:
print("Sending bytes: ", end="")
if not message.endswith(b"\n"):
message += b"\n"
# simulate network slowness
# sending bytes one by one
for ch in message:
# simulated latency
await asyncio.sleep(0.1)
writer.write(bytes([ch]))
print(f"{hex(ch)[2:].upper():0>2}", end="")
sys.stdout.flush()
if ch == 10:
print()
await writer.drain()
async def handle_writes(writer: asyncio.StreamWriter, queue: asyncio.Queue[bytes]) -> None:
try:
while (message := await queue.get()) != b"":
await write(writer, message)
finally:
await writer.drain()
writer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment