Last active
July 26, 2023 20:59
-
-
Save albscui/970c84c37eb1e8f13d81aab87f8577b3 to your computer and use it in GitHub Desktop.
Concurrency with coroutines in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Credit: https://youtu.be/MCs5OvhV9S4 | |
Example concurrent server impelmented with coroutines not threads. | |
Main idea is to avoid being blocked by IO. Although we can achieve this with threads, | |
we want to use coroutines instead, because threads are expensive. | |
A coroutine is a function that yields control to the caller before a blocking call. | |
An event loop continuously polls for coroutines and runs them. | |
Use the `select` system call to find "active" sockets, and to "wake up" coroutines. | |
""" | |
import socket | |
from collections import deque | |
from select import select | |
def fib(n): | |
# 0, 1, 2, 3 | |
# 0, 1, 1, 2 | |
if n <= 1: | |
return n | |
a, b = 0, 1 | |
while n > 1: | |
a, b = b, a + b | |
n -= 1 | |
return b | |
# coroutine for creating connections to client | |
def fib_server(address): | |
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
ssock.bind(address) | |
ssock.listen(5) | |
while True: | |
yield "recv", ssock | |
csock, addr = ssock.accept() # blocking | |
print("Connection", addr) | |
tasks.append(fib_handler(csock)) | |
# coroutine for handling request | |
def fib_handler(client_socket): | |
while True: | |
yield "recv", client_socket | |
req = client_socket.recv(100) # blocking | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode("ascii") + b"\n" | |
yield "send", client_socket | |
client_socket.send(resp) # blocking | |
print("client connection closed") | |
tasks = deque() # task queue for "active" coroutines | |
recv_wait = {} # map client socket to the coroutine, socket is waiting for data from the client | |
send_wait = {} # map client socket to the coroutine, socket is waiting for client to be ready to receive data | |
def run(): | |
# event loop | |
while any([tasks, recv_wait, send_wait]): | |
while not tasks: | |
# No active tasks to run, wait for I/O | |
can_recv, can_send, _ = select(recv_wait, send_wait, []) # returns sockets that are ready for read and write | |
for s in can_recv: | |
tasks.append(recv_wait.pop(s)) | |
for s in can_send: | |
tasks.append(send_wait.pop(s)) | |
task = tasks.popleft() | |
try: | |
event_type, sock = next(task) | |
if event_type == "recv": | |
recv_wait[sock] = task | |
elif event_type == "send": | |
send_wait[sock] = task | |
else: | |
raise RuntimeError("can only recv or send") | |
except StopIteration: | |
print("done") | |
tasks.append(fib_server(("", 25000))) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment