Skip to content

Instantly share code, notes, and snippets.

@dabeaz
Created October 17, 2019 17:46
Show Gist options
  • Star 82 You must be signed in to star a gist
  • Fork 37 You must be signed in to fork a gist
  • Save dabeaz/f86ded8d61206c757c5cd4dbb5109f74 to your computer and use it in GitHub Desktop.
Save dabeaz/f86ded8d61206c757c5cd4dbb5109f74 to your computer and use it in GitHub Desktop.
"Build Your Own Async" Workshop - PyCon India - October 14, 2019 - https://www.youtube.com/watch?v=Y4Gt3Xjd7G8
# aproducer.py
#
# Async Producer-consumer problem.
# Challenge: How to implement the same functionality, but no threads.
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
# -----
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # All getters waiting for data
def put(self, item):
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# Do we call it right away? No. Schedule it to be called.
sched.call_soon(func)
def get(self, callback):
# Wait until an item is available. Then return it
if self.items:
callback(self.items.popleft())
else:
self.waiting.append(lambda: self.get(callback))
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n+1))
else:
print('Producer done')
q.put(None)
_run(0)
def consumer(q):
def _consume(item):
if item is None:
print('Consumer done')
else:
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
q.get(callback=_consume)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q,))
sched.run()
# aproducer_error.py
#
# Example of returning results + errors in callback based code.
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
# -----
# Class used to communicate both a normal value or an exception
class Result:
def __init__(self, value=None, exc=None):
self.value = value
self.exc = exc
def result(self):
if self.exc:
raise self.exc
else:
return self.value
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # All getters waiting for data
self._closed = False # Can queue be used anymore?
def close(self):
self._closed = True
if self.waiting and not self.items:
for func in self.waiting:
sched.call_soon(func)
def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# Do we call it right away?
sched.call_soon(func)
def get(self, callback):
# Wait until an item is available. Then return it
# Question: How does a closed queue interact with get()
if self.items:
callback(Result(value=self.items.popleft())) # Good result
else:
# No items available (must wait)
if self._closed:
callback(Result(exc=QueueClosed())) # Error result
else:
self.waiting.append(lambda: self.get(callback))
class QueueClosed(Exception):
pass
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n+1))
else:
print('Producer done')
q.close() # Means no more items will be produced
_run(0)
def consumer(q):
def _consume(result):
try:
item = result.result()
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
except QueueClosed:
print("Consumer done")
q.get(callback=_consume)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q,))
sched.run()
# asynco.py
#
# A basic asynchronous scheduler with support for time
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0 # Used to break ties in priority queue
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4)
sched.call_later(4, lambda: countdown(n-1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x+1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
# coro_callback.py
#
# An example of how to implement coroutine based concurrency layered
# on top of a callback-based scheduler.
import time
from collections import deque
import heapq
# Callback based scheduler (from earlier)
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
# Coroutine-based functions
def new_task(self, coro):
self.ready.append(Task(coro)) # Wrapped coroutine
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch() # Switch to a new task
# Class that wraps a coroutine--making it look like a callback
class Task:
def __init__(self, coro):
self.coro = coro # "Wrapped coroutine"
# Make it look like a callback
def __call__(self):
try:
# Driving the coroutine as before
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler() # Background scheduler object
# ----------------
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
# Coroutine-based tasks
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None) # "Sentinel" to shut down
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
# Call-back based tasks
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4) # Blocking call (nothing else can run)
sched.call_later(4, lambda: countdown(n-1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x+1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
# Build Your Own Async
#
# David Beazley (@dabeaz)
# https://www.dabeaz.com
#
# Originally presented at PyCon India, Chennai, October 14, 2019
import time
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
# Example of sequential execution
countdown(5)
countup(5)
# Example of concurrent execution (via threads)
import threading
threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()
# io_scheduler.py
#
# An example of implementing I/O operations in the scheduler
import time
from collections import deque
import heapq
from select import select
# Callback based scheduler (from earlier)
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
self._read_waiting = { }
self._write_waiting = { }
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def read_wait(self, fileno, func):
# Trigger func() when fileno is readable
self._read_waiting[fileno] = func
def write_wait(self, fileno, func):
# Trigger func() when fileno is writeable
self._write_waiting[fileno] = func
def run(self):
while (self.ready or self.sleeping or self._read_waiting or self._write_waiting):
if not self.ready:
# Find the nearest deadline
if self.sleeping:
deadline, _, func = self.sleeping[0]
timeout = deadline - time.time()
if timeout < 0:
timeout = 0
else:
timeout = None # Wait forever
# Wait for I/O (and sleep)
can_read, can_write, _ = select(self._read_waiting,
self._write_waiting, [], timeout)
for fd in can_read:
self.ready.append(self._read_waiting.pop(fd))
for fd in can_write:
self.ready.append(self._write_waiting.pop(fd))
# Check for sleeping tasks
now = time.time()
while self.sleeping:
if now > self.sleeping[0][0]:
self.ready.append(heapq.heappop(self.sleeping)[2])
else:
break
while self.ready:
func = self.ready.popleft()
func()
def new_task(self, coro):
self.ready.append(Task(coro)) # Wrapped coroutine
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch() # Switch to a new task
async def recv(self, sock, maxbytes):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.recv(maxbytes)
async def send(self, sock, data):
self.write_wait(sock, self.current)
self.current = None
await switch()
return sock.send(data)
async def accept(self, sock):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.accept()
class Task:
def __init__(self, coro):
self.coro = coro # "Wrapped coroutine"
# Make it look like a callback
def __call__(self):
try:
# Driving the coroutine as before
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler() # Background scheduler object
# ----------------
from socket import *
async def tcp_server(addr):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(1)
while True:
client, addr = await sched.accept(sock)
print('Connection from', addr)
sched.new_task(echo_handler(client))
async def echo_handler(sock):
while True:
data = await sched.recv(sock, 10000)
if not data:
break
await sched.send(sock, b'Got:' + data)
print('Connection closed')
sock.close()
sched.new_task(tcp_server(('', 30000)))
sched.run()
# producer.py
#
# Producer-consumer problem with threads
import queue
import threading
import time
def producer(q, count):
for n in range(count):
print('Producing', n)
q.put(n)
time.sleep(1)
print('Producer done')
q.put(None) # "Sentinel" to shut down
def consumer(q):
while True:
item = q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = queue.Queue() # Thread-safe queue
threading.Thread(target=producer, args=(q, 10)).start()
threading.Thread(target=consumer, args=(q,)).start()
# yieldo.py
#
# Example of a coroutine-based scheduler
import time
from collections import deque
import heapq
# Plumbing that makes the "await" statement work. We provide
# a single function "switch" that is used by the schedule to
# switch tasks.
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ---- Example code
async def countdown(n):
while n > 0:
print('Down', n)
await sched.sleep(4)
n -= 1
async def countup(stop):
x = 0
while x < stop:
print('Up', x)
await sched.sleep(1)
x += 1
sched.new_task(countdown(5))
sched.new_task(countup(20))
sched.run()
# yproducer.py
#
# Coroutine producer-consumer problem. With async-await
import time
from collections import deque
import heapq
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ----------------
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None) # "Sentinel" to shut down
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
sched.run()
# yproducer_error.py
#
# Example of error handling with coroutines
import time
from collections import deque
import heapq
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ----------------
class QueueClosed(Exception):
pass
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
self._closed = False
def close(self):
self._closed = True
if self.waiting and not self.items:
sched.ready.append(self.waiting.popleft()) # Reschedule waiting tasks
async def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
while not self.items:
if self._closed:
raise QueueClosed()
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
q.close()
async def consumer(q):
try:
while True:
item = await q.get()
print('Consuming', item)
except QueueClosed:
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
sched.run()
@antibagr
Copy link

Such an awesome material in a way to understanding asyncio core concepts.

@mthipparthi
Copy link

mthipparthi commented Apr 5, 2021

Such an awesome material in a way to understanding asyncio core concepts.
Really best explanation. Lot to learn.

@abdelrhman-adel-ahmed
Copy link

the problem with the code in the last example lies here :
self.tasks.append(func) ,so after the countup ,countdown,producer,consumer finishes and we start the client, the last function that enter here deadline,seq,func=heapq.heappop(self.sleeping) lets say for instance is the consumer so it will get added again despite it already finished ! so that why when we send to it it gives the error that we cannot send to already awaited,
and if we just connect to the server in the middle while them execute that instance of the client will not producer an error

@abdelrhman-adel-ahmed
Copy link

abdelrhman-adel-ahmed commented Aug 11, 2021

solution is :instead of append the function that we poped at the end after the while loop we just append it right away
deadline,seq,func=heapq.heappop(self.sleeping)
heapq.heappush(self.sleeping, (deadline,seq,func))

@abdelrhman-adel-ahmed
Copy link

note : the error will have different flavor ,weather a callback function was the last or a coroutine func was the last

@jerryan999
Copy link

awesome!

@csrgxtu
Copy link

csrgxtu commented Apr 10, 2023

really really great.

@bartwroblewski
Copy link

A question regarding the part concerning TCP server/sockets: wouldn't this code benefit even further from using non-blocking sockets (socket.setblocking(False)? And then yielding/awaiting whenever such socket is not ready and produces BlockingIOError?
Similar to how it is done here: https://www.pythonsheets.com/appendix/python-concurrent.html

@dabeaz
Copy link
Author

dabeaz commented Nov 11, 2023

Using non-blocking I/O is something you would do in a production system. It's not used here because this was part of a course and there are only so many concepts one can absorb before one's head explodes. Fiddling around with socket options seemed like more of a distraction than helpful in this context.

@mesropt
Copy link

mesropt commented Dec 21, 2023

Thank you!

@huimin-he
Copy link

Thank you for the great workshop!

    async def put(self, item):
        if self._closed:
            raise QueueClosed()

        self.items.append(item)
        if self.waiting:
            sched.ready.append(self.waiting.popleft())

feel we could remove async notation from the put function because it is not using any await

@capymind
Copy link

capymind commented Apr 7, 2024

I would be really hard on understanding what async is without your code and conference talks.
Thank you.

@capymind
Copy link

capymind commented Apr 7, 2024

@huimin-he
Yeah, beazley don't add async keyword to put method at first time actually.
However, it was design choice to add async to put (consistency of API in terms of user usage maybe) he mentioned at youtube vedioe around 1:38:00-1:40:00. It toally make sense to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment