Created
June 27, 2021 20:16
-
-
Save lw/49767cf4cbe693a129aa1b84a0a0b2d9 to your computer and use it in GitHub Desktop.
Asyncio using generators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Demo asynchronous server using only generators. | |
Test with: | |
$ netcat -N 127.0.0.1 1234 | |
""" | |
import enum | |
import select | |
import socket | |
from typing import Generator, NamedTuple, Optional | |
class Op(enum.Enum): | |
READ = "r" | |
WRITE = "w" | |
class Blocker(NamedTuple): | |
op: Op | |
fd: int | |
class Handler(NamedTuple): | |
socket: socket.socket | |
peername: str | |
coroutine: Generator[Blocker, None, None] | |
class BlockedHandler(NamedTuple): | |
handler: Handler | |
blocker: Blocker | |
class SomeError(Exception): | |
pass | |
def readall(s: socket.socket, peername: str) -> Generator[Blocker, None, bytes]: | |
bytes_read: List[bytes] = [] | |
while True: | |
yield Blocker(op=Op.READ, fd=s.fileno()) # give control back to main loop | |
try: | |
b = s.recv(1024) | |
except BlockingIOError: | |
pass | |
else: | |
if len(b) == 0: | |
print(f"Connection from {peername} closed") | |
break | |
print(f"Received {b!r} from {peername}", flush=True) | |
bytes_read.append(b) | |
return b"".join(bytes_read) | |
def writeall(s: socket.socket, peername: str, data: bytes) -> Generator[Blocker, None, None]: | |
while len(data) > 0: | |
yield Blocker(op=Op.WRITE, fd=s.fileno()) # give control back to main loop | |
try: | |
bytes_sent = s.send(data) | |
except BlockingIOError: | |
pass | |
else: | |
if bytes_sent == 0: | |
raise IOError(f"Connection to {peername} closed") | |
print(f"Sent {data[:bytes_sent]!r} to {peername}", | |
flush=True) | |
data = data[bytes_sent:] | |
def handler(s: socket.socket, peername: str) -> Generator[Blocker, None, None]: | |
request = yield from readall(s, peername) # give control back to main loop | |
s.shutdown(socket.SHUT_RD) | |
response = b"%d\n" % sum(int(x) for x in request.split()) | |
yield from writeall(s, peername, response) # give control back to main loop | |
s.shutdown(socket.SHUT_WR) | |
s.close() | |
def resume_handler(h: Handler) -> Optional[BlockedHandler]: | |
try: | |
b = next(h.coroutine) | |
except StopIteration: | |
print(f"Handler for {h.peername} terminated") | |
return None | |
except Exception as err: | |
print(f"Handler for {h.peername} killed by {err!r}") | |
return None | |
else: | |
print( | |
f"Handler for {h.peername} is waiting " | |
f"to {b.op.name.lower()} on fd {b.fd}") | |
return BlockedHandler(handler=h, blocker=b) | |
def kill_handler(h: Handler) -> None: | |
try: | |
h.send(SomeError()) | |
except StopIteration: | |
print(f"Handler for {h.peername} terminated") | |
except Exception as err: | |
print(f"Handler for {h.peername} killed by {err!r}") | |
def main(): | |
s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) | |
s.setblocking(False) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind(("127.0.0.1", 1234)) | |
s.listen(0) | |
handlers: List[BlockedHandler] = [] | |
while True: | |
print( | |
f"{len(handlers)} handlers waiting for I/O (plus the root socket)") | |
ready_read, ready_write, have_error = select.select( | |
([s.fileno()] | |
+ [bh.blocker.fd for bh in handlers if bh.blocker.op is Op.READ]), | |
[bh.blocker.fd for bh in handlers if bh.blocker.op is Op.WRITE], | |
[bh.blocker.fd for bh in handlers], | |
) | |
new_handlers: List[BlockedHandler] = [] | |
if s.fileno() in have_error: | |
raise RuntimeError(f"Error in parent socket") | |
elif s.fileno() in ready_read: | |
try: | |
child_s, _ = s.accept() | |
except BlockingIOError: | |
pass | |
else: | |
child_s.setblocking(False) | |
peername = repr(child_s.getpeername()) | |
print(f"{peername} connected") | |
new_h = Handler( | |
socket=child_s, | |
peername=peername, | |
coroutine=handler(child_s, peername), | |
) | |
new_bh = resume_handler(new_h) | |
if new_bh is not None: | |
new_handlers.append(new_bh) | |
for bh in handlers: | |
if bh.blocker.fd in have_error: | |
print( | |
f"Handler for {bh.handler.peername} got error " | |
f"while waiting to {bh.blocker.op.name.lower()} on fd {bh.blocker.fd}") | |
kill_handler(bh.handler) | |
elif (bh.blocker.op is Op.READ and bh.blocker.fd in ready_read) \ | |
or (bh.blocker.op is Op.WRITE and bh.blocker.fd in ready_write): | |
print( | |
f"Handler for {bh.handler.peername} is now able " | |
f"to {bh.blocker.op.name.lower()} on fd {bh.blocker.fd}") | |
new_bh = resume_handler(bh.handler) | |
if new_bh is not None: | |
new_handlers.append(new_bh) | |
else: | |
new_handlers.append(bh) | |
handlers = new_handlers | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment