Skip to content

Instantly share code, notes, and snippets.

@albscui
Last active July 26, 2023 20:59
Show Gist options
  • Save albscui/970c84c37eb1e8f13d81aab87f8577b3 to your computer and use it in GitHub Desktop.
Save albscui/970c84c37eb1e8f13d81aab87f8577b3 to your computer and use it in GitHub Desktop.
Concurrency with coroutines in Python
"""
Credit: https://youtu.be/MCs5OvhV9S4
Example concurrent server impelmented with coroutines not threads.
Main idea is to avoid being blocked by IO. Although we can achieve this with threads,
we want to use coroutines instead, because threads are expensive.
A coroutine is a function that yields control to the caller before a blocking call.
An event loop continuously polls for coroutines and runs them.
Use the `select` system call to find "active" sockets, and to "wake up" coroutines.
"""
import socket
from collections import deque
from select import select
def fib(n):
# 0, 1, 2, 3
# 0, 1, 1, 2
if n <= 1:
return n
a, b = 0, 1
while n > 1:
a, b = b, a + b
n -= 1
return b
# coroutine for creating connections to client
def fib_server(address):
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock.bind(address)
ssock.listen(5)
while True:
yield "recv", ssock
csock, addr = ssock.accept() # blocking
print("Connection", addr)
tasks.append(fib_handler(csock))
# coroutine for handling request
def fib_handler(client_socket):
while True:
yield "recv", client_socket
req = client_socket.recv(100) # blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode("ascii") + b"\n"
yield "send", client_socket
client_socket.send(resp) # blocking
print("client connection closed")
tasks = deque() # task queue for "active" coroutines
recv_wait = {} # map client socket to the coroutine, socket is waiting for data from the client
send_wait = {} # map client socket to the coroutine, socket is waiting for client to be ready to receive data
def run():
# event loop
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run, wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, []) # returns sockets that are ready for read and write
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
event_type, sock = next(task)
if event_type == "recv":
recv_wait[sock] = task
elif event_type == "send":
send_wait[sock] = task
else:
raise RuntimeError("can only recv or send")
except StopIteration:
print("done")
tasks.append(fib_server(("", 25000)))
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment