Skip to content

Instantly share code, notes, and snippets.

@lw
Created June 27, 2021 20:16
Show Gist options
  • Save lw/49767cf4cbe693a129aa1b84a0a0b2d9 to your computer and use it in GitHub Desktop.
Save lw/49767cf4cbe693a129aa1b84a0a0b2d9 to your computer and use it in GitHub Desktop.
Asyncio using generators
"""Demo asynchronous server using only generators.
Test with:
$ netcat -N 127.0.0.1 1234
"""
import enum
import select
import socket
from typing import Generator, NamedTuple, Optional
class Op(enum.Enum):
READ = "r"
WRITE = "w"
class Blocker(NamedTuple):
op: Op
fd: int
class Handler(NamedTuple):
socket: socket.socket
peername: str
coroutine: Generator[Blocker, None, None]
class BlockedHandler(NamedTuple):
handler: Handler
blocker: Blocker
class SomeError(Exception):
pass
def readall(s: socket.socket, peername: str) -> Generator[Blocker, None, bytes]:
bytes_read: List[bytes] = []
while True:
yield Blocker(op=Op.READ, fd=s.fileno()) # give control back to main loop
try:
b = s.recv(1024)
except BlockingIOError:
pass
else:
if len(b) == 0:
print(f"Connection from {peername} closed")
break
print(f"Received {b!r} from {peername}", flush=True)
bytes_read.append(b)
return b"".join(bytes_read)
def writeall(s: socket.socket, peername: str, data: bytes) -> Generator[Blocker, None, None]:
while len(data) > 0:
yield Blocker(op=Op.WRITE, fd=s.fileno()) # give control back to main loop
try:
bytes_sent = s.send(data)
except BlockingIOError:
pass
else:
if bytes_sent == 0:
raise IOError(f"Connection to {peername} closed")
print(f"Sent {data[:bytes_sent]!r} to {peername}",
flush=True)
data = data[bytes_sent:]
def handler(s: socket.socket, peername: str) -> Generator[Blocker, None, None]:
request = yield from readall(s, peername) # give control back to main loop
s.shutdown(socket.SHUT_RD)
response = b"%d\n" % sum(int(x) for x in request.split())
yield from writeall(s, peername, response) # give control back to main loop
s.shutdown(socket.SHUT_WR)
s.close()
def resume_handler(h: Handler) -> Optional[BlockedHandler]:
try:
b = next(h.coroutine)
except StopIteration:
print(f"Handler for {h.peername} terminated")
return None
except Exception as err:
print(f"Handler for {h.peername} killed by {err!r}")
return None
else:
print(
f"Handler for {h.peername} is waiting "
f"to {b.op.name.lower()} on fd {b.fd}")
return BlockedHandler(handler=h, blocker=b)
def kill_handler(h: Handler) -> None:
try:
h.send(SomeError())
except StopIteration:
print(f"Handler for {h.peername} terminated")
except Exception as err:
print(f"Handler for {h.peername} killed by {err!r}")
def main():
s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 1234))
s.listen(0)
handlers: List[BlockedHandler] = []
while True:
print(
f"{len(handlers)} handlers waiting for I/O (plus the root socket)")
ready_read, ready_write, have_error = select.select(
([s.fileno()]
+ [bh.blocker.fd for bh in handlers if bh.blocker.op is Op.READ]),
[bh.blocker.fd for bh in handlers if bh.blocker.op is Op.WRITE],
[bh.blocker.fd for bh in handlers],
)
new_handlers: List[BlockedHandler] = []
if s.fileno() in have_error:
raise RuntimeError(f"Error in parent socket")
elif s.fileno() in ready_read:
try:
child_s, _ = s.accept()
except BlockingIOError:
pass
else:
child_s.setblocking(False)
peername = repr(child_s.getpeername())
print(f"{peername} connected")
new_h = Handler(
socket=child_s,
peername=peername,
coroutine=handler(child_s, peername),
)
new_bh = resume_handler(new_h)
if new_bh is not None:
new_handlers.append(new_bh)
for bh in handlers:
if bh.blocker.fd in have_error:
print(
f"Handler for {bh.handler.peername} got error "
f"while waiting to {bh.blocker.op.name.lower()} on fd {bh.blocker.fd}")
kill_handler(bh.handler)
elif (bh.blocker.op is Op.READ and bh.blocker.fd in ready_read) \
or (bh.blocker.op is Op.WRITE and bh.blocker.fd in ready_write):
print(
f"Handler for {bh.handler.peername} is now able "
f"to {bh.blocker.op.name.lower()} on fd {bh.blocker.fd}")
new_bh = resume_handler(bh.handler)
if new_bh is not None:
new_handlers.append(new_bh)
else:
new_handlers.append(bh)
handlers = new_handlers
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment