-
-
Save dabeaz/f86ded8d61206c757c5cd4dbb5109f74 to your computer and use it in GitHub Desktop.
# aproducer.py | |
# | |
# Async Producer-consumer problem. | |
# Challenge: How to implement the same functionality, but no threads. | |
import time | |
from collections import deque | |
import heapq | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Functions ready to execute | |
self.sleeping = [] # Sleeping functions | |
self.sequence = 0 | |
def call_soon(self, func): | |
self.ready.append(func) | |
def call_later(self, delay, func): | |
self.sequence += 1 | |
deadline = time.time() + delay # Expiration time | |
# Priority queue | |
heapq.heappush(self.sleeping, (deadline, self.sequence, func)) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
# Find the nearest deadline | |
deadline, _, func = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(func) | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
sched = Scheduler() # Behind scenes scheduler object | |
# ----- | |
class AsyncQueue: | |
def __init__(self): | |
self.items = deque() | |
self.waiting = deque() # All getters waiting for data | |
def put(self, item): | |
self.items.append(item) | |
if self.waiting: | |
func = self.waiting.popleft() | |
# Do we call it right away? No. Schedule it to be called. | |
sched.call_soon(func) | |
def get(self, callback): | |
# Wait until an item is available. Then return it | |
if self.items: | |
callback(self.items.popleft()) | |
else: | |
self.waiting.append(lambda: self.get(callback)) | |
def producer(q, count): | |
def _run(n): | |
if n < count: | |
print('Producing', n) | |
q.put(n) | |
sched.call_later(1, lambda: _run(n+1)) | |
else: | |
print('Producer done') | |
q.put(None) | |
_run(0) | |
def consumer(q): | |
def _consume(item): | |
if item is None: | |
print('Consumer done') | |
else: | |
print('Consuming', item) | |
sched.call_soon(lambda: consumer(q)) | |
q.get(callback=_consume) | |
q = AsyncQueue() | |
sched.call_soon(lambda: producer(q, 10)) | |
sched.call_soon(lambda: consumer(q,)) | |
sched.run() | |
# aproducer_error.py | |
# | |
# Example of returning results + errors in callback based code. | |
import time | |
from collections import deque | |
import heapq | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Functions ready to execute | |
self.sleeping = [] # Sleeping functions | |
self.sequence = 0 | |
def call_soon(self, func): | |
self.ready.append(func) | |
def call_later(self, delay, func): | |
self.sequence += 1 | |
deadline = time.time() + delay # Expiration time | |
# Priority queue | |
heapq.heappush(self.sleeping, (deadline, self.sequence, func)) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
# Find the nearest deadline | |
deadline, _, func = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(func) | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
sched = Scheduler() # Behind scenes scheduler object | |
# ----- | |
# Class used to communicate both a normal value or an exception | |
class Result: | |
def __init__(self, value=None, exc=None): | |
self.value = value | |
self.exc = exc | |
def result(self): | |
if self.exc: | |
raise self.exc | |
else: | |
return self.value | |
class AsyncQueue: | |
def __init__(self): | |
self.items = deque() | |
self.waiting = deque() # All getters waiting for data | |
self._closed = False # Can queue be used anymore? | |
def close(self): | |
self._closed = True | |
if self.waiting and not self.items: | |
for func in self.waiting: | |
sched.call_soon(func) | |
def put(self, item): | |
if self._closed: | |
raise QueueClosed() | |
self.items.append(item) | |
if self.waiting: | |
func = self.waiting.popleft() | |
# Do we call it right away? | |
sched.call_soon(func) | |
def get(self, callback): | |
# Wait until an item is available. Then return it | |
# Question: How does a closed queue interact with get() | |
if self.items: | |
callback(Result(value=self.items.popleft())) # Good result | |
else: | |
# No items available (must wait) | |
if self._closed: | |
callback(Result(exc=QueueClosed())) # Error result | |
else: | |
self.waiting.append(lambda: self.get(callback)) | |
class QueueClosed(Exception): | |
pass | |
def producer(q, count): | |
def _run(n): | |
if n < count: | |
print('Producing', n) | |
q.put(n) | |
sched.call_later(1, lambda: _run(n+1)) | |
else: | |
print('Producer done') | |
q.close() # Means no more items will be produced | |
_run(0) | |
def consumer(q): | |
def _consume(result): | |
try: | |
item = result.result() | |
print('Consuming', item) | |
sched.call_soon(lambda: consumer(q)) | |
except QueueClosed: | |
print("Consumer done") | |
q.get(callback=_consume) | |
q = AsyncQueue() | |
sched.call_soon(lambda: producer(q, 10)) | |
sched.call_soon(lambda: consumer(q,)) | |
sched.run() | |
# asynco.py | |
# | |
# A basic asynchronous scheduler with support for time | |
import time | |
from collections import deque | |
import heapq | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Functions ready to execute | |
self.sleeping = [] # Sleeping functions | |
self.sequence = 0 # Used to break ties in priority queue | |
def call_soon(self, func): | |
self.ready.append(func) | |
def call_later(self, delay, func): | |
self.sequence += 1 | |
deadline = time.time() + delay # Expiration time | |
heapq.heappush(self.sleeping, (deadline, self.sequence, func)) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
# Find the nearest deadline | |
deadline, _, func = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(func) | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
sched = Scheduler() # Behind scenes scheduler object | |
def countdown(n): | |
if n > 0: | |
print('Down', n) | |
# time.sleep(4) | |
sched.call_later(4, lambda: countdown(n-1)) | |
def countup(stop): | |
def _run(x): | |
if x < stop: | |
print('Up', x) | |
# time.sleep(1) | |
sched.call_later(1, lambda: _run(x+1)) | |
_run(0) | |
sched.call_soon(lambda: countdown(5)) | |
sched.call_soon(lambda: countup(20)) | |
sched.run() |
# coro_callback.py | |
# | |
# An example of how to implement coroutine based concurrency layered | |
# on top of a callback-based scheduler. | |
import time | |
from collections import deque | |
import heapq | |
# Callback based scheduler (from earlier) | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Functions ready to execute | |
self.sleeping = [] # Sleeping functions | |
self.sequence = 0 | |
def call_soon(self, func): | |
self.ready.append(func) | |
def call_later(self, delay, func): | |
self.sequence += 1 | |
deadline = time.time() + delay # Expiration time | |
# Priority queue | |
heapq.heappush(self.sleeping, (deadline, self.sequence, func)) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
# Find the nearest deadline | |
deadline, _, func = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(func) | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
# Coroutine-based functions | |
def new_task(self, coro): | |
self.ready.append(Task(coro)) # Wrapped coroutine | |
async def sleep(self, delay): | |
self.call_later(delay, self.current) | |
self.current = None | |
await switch() # Switch to a new task | |
# Class that wraps a coroutine--making it look like a callback | |
class Task: | |
def __init__(self, coro): | |
self.coro = coro # "Wrapped coroutine" | |
# Make it look like a callback | |
def __call__(self): | |
try: | |
# Driving the coroutine as before | |
sched.current = self | |
self.coro.send(None) | |
if sched.current: | |
sched.ready.append(self) | |
except StopIteration: | |
pass | |
class Awaitable: | |
def __await__(self): | |
yield | |
def switch(): | |
return Awaitable() | |
sched = Scheduler() # Background scheduler object | |
# ---------------- | |
class AsyncQueue: | |
def __init__(self): | |
self.items = deque() | |
self.waiting = deque() | |
async def put(self, item): | |
self.items.append(item) | |
if self.waiting: | |
sched.ready.append(self.waiting.popleft()) | |
async def get(self): | |
if not self.items: | |
self.waiting.append(sched.current) # Put myself to sleep | |
sched.current = None # "Disappear" | |
await switch() # Switch to another task | |
return self.items.popleft() | |
# Coroutine-based tasks | |
async def producer(q, count): | |
for n in range(count): | |
print('Producing', n) | |
await q.put(n) | |
await sched.sleep(1) | |
print('Producer done') | |
await q.put(None) # "Sentinel" to shut down | |
async def consumer(q): | |
while True: | |
item = await q.get() | |
if item is None: | |
break | |
print('Consuming', item) | |
print('Consumer done') | |
q = AsyncQueue() | |
sched.new_task(producer(q, 10)) | |
sched.new_task(consumer(q)) | |
# Call-back based tasks | |
def countdown(n): | |
if n > 0: | |
print('Down', n) | |
# time.sleep(4) # Blocking call (nothing else can run) | |
sched.call_later(4, lambda: countdown(n-1)) | |
def countup(stop): | |
def _run(x): | |
if x < stop: | |
print('Up', x) | |
# time.sleep(1) | |
sched.call_later(1, lambda: _run(x+1)) | |
_run(0) | |
sched.call_soon(lambda: countdown(5)) | |
sched.call_soon(lambda: countup(20)) | |
sched.run() | |
# Build Your Own Async | |
# | |
# David Beazley (@dabeaz) | |
# https://www.dabeaz.com | |
# | |
# Originally presented at PyCon India, Chennai, October 14, 2019 | |
import time | |
def countdown(n): | |
while n > 0: | |
print('Down', n) | |
time.sleep(1) | |
n -= 1 | |
def countup(stop): | |
x = 0 | |
while x < stop: | |
print('Up', x) | |
time.sleep(1) | |
x += 1 | |
# Example of sequential execution | |
countdown(5) | |
countup(5) | |
# Example of concurrent execution (via threads) | |
import threading | |
threading.Thread(target=countdown, args=(5,)).start() | |
threading.Thread(target=countup, args=(5,)).start() |
# io_scheduler.py | |
# | |
# An example of implementing I/O operations in the scheduler | |
import time | |
from collections import deque | |
import heapq | |
from select import select | |
# Callback based scheduler (from earlier) | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Functions ready to execute | |
self.sleeping = [] # Sleeping functions | |
self.sequence = 0 | |
self._read_waiting = { } | |
self._write_waiting = { } | |
def call_soon(self, func): | |
self.ready.append(func) | |
def call_later(self, delay, func): | |
self.sequence += 1 | |
deadline = time.time() + delay # Expiration time | |
# Priority queue | |
heapq.heappush(self.sleeping, (deadline, self.sequence, func)) | |
def read_wait(self, fileno, func): | |
# Trigger func() when fileno is readable | |
self._read_waiting[fileno] = func | |
def write_wait(self, fileno, func): | |
# Trigger func() when fileno is writeable | |
self._write_waiting[fileno] = func | |
def run(self): | |
while (self.ready or self.sleeping or self._read_waiting or self._write_waiting): | |
if not self.ready: | |
# Find the nearest deadline | |
if self.sleeping: | |
deadline, _, func = self.sleeping[0] | |
timeout = deadline - time.time() | |
if timeout < 0: | |
timeout = 0 | |
else: | |
timeout = None # Wait forever | |
# Wait for I/O (and sleep) | |
can_read, can_write, _ = select(self._read_waiting, | |
self._write_waiting, [], timeout) | |
for fd in can_read: | |
self.ready.append(self._read_waiting.pop(fd)) | |
for fd in can_write: | |
self.ready.append(self._write_waiting.pop(fd)) | |
# Check for sleeping tasks | |
now = time.time() | |
while self.sleeping: | |
if now > self.sleeping[0][0]: | |
self.ready.append(heapq.heappop(self.sleeping)[2]) | |
else: | |
break | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
def new_task(self, coro): | |
self.ready.append(Task(coro)) # Wrapped coroutine | |
async def sleep(self, delay): | |
self.call_later(delay, self.current) | |
self.current = None | |
await switch() # Switch to a new task | |
async def recv(self, sock, maxbytes): | |
self.read_wait(sock, self.current) | |
self.current = None | |
await switch() | |
return sock.recv(maxbytes) | |
async def send(self, sock, data): | |
self.write_wait(sock, self.current) | |
self.current = None | |
await switch() | |
return sock.send(data) | |
async def accept(self, sock): | |
self.read_wait(sock, self.current) | |
self.current = None | |
await switch() | |
return sock.accept() | |
class Task: | |
def __init__(self, coro): | |
self.coro = coro # "Wrapped coroutine" | |
# Make it look like a callback | |
def __call__(self): | |
try: | |
# Driving the coroutine as before | |
sched.current = self | |
self.coro.send(None) | |
if sched.current: | |
sched.ready.append(self) | |
except StopIteration: | |
pass | |
class Awaitable: | |
def __await__(self): | |
yield | |
def switch(): | |
return Awaitable() | |
sched = Scheduler() # Background scheduler object | |
# ---------------- | |
from socket import * | |
async def tcp_server(addr): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.bind(addr) | |
sock.listen(1) | |
while True: | |
client, addr = await sched.accept(sock) | |
print('Connection from', addr) | |
sched.new_task(echo_handler(client)) | |
async def echo_handler(sock): | |
while True: | |
data = await sched.recv(sock, 10000) | |
if not data: | |
break | |
await sched.send(sock, b'Got:' + data) | |
print('Connection closed') | |
sock.close() | |
sched.new_task(tcp_server(('', 30000))) | |
sched.run() | |
# producer.py | |
# | |
# Producer-consumer problem with threads | |
import queue | |
import threading | |
import time | |
def producer(q, count): | |
for n in range(count): | |
print('Producing', n) | |
q.put(n) | |
time.sleep(1) | |
print('Producer done') | |
q.put(None) # "Sentinel" to shut down | |
def consumer(q): | |
while True: | |
item = q.get() | |
if item is None: | |
break | |
print('Consuming', item) | |
print('Consumer done') | |
q = queue.Queue() # Thread-safe queue | |
threading.Thread(target=producer, args=(q, 10)).start() | |
threading.Thread(target=consumer, args=(q,)).start() | |
# yieldo.py | |
# | |
# Example of a coroutine-based scheduler | |
import time | |
from collections import deque | |
import heapq | |
# Plumbing that makes the "await" statement work. We provide | |
# a single function "switch" that is used by the schedule to | |
# switch tasks. | |
class Awaitable: | |
def __await__(self): | |
yield | |
def switch(): | |
return Awaitable() | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() | |
self.sleeping = [ ] | |
self.current = None # Currently executing generator | |
self.sequence = 0 | |
async def sleep(self, delay): | |
deadline = time.time() + delay | |
self.sequence += 1 | |
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) | |
self.current = None # "Disappear" | |
await switch() # Switch tasks | |
def new_task(self, coro): | |
self.ready.append(coro) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
deadline, _, coro = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(coro) | |
self.current = self.ready.popleft() | |
# Drive as a generator | |
try: | |
self.current.send(None) # Send to a coroutine | |
if self.current: | |
self.ready.append(self.current) | |
except StopIteration: | |
pass | |
sched = Scheduler() # Background scheduler object | |
# ---- Example code | |
async def countdown(n): | |
while n > 0: | |
print('Down', n) | |
await sched.sleep(4) | |
n -= 1 | |
async def countup(stop): | |
x = 0 | |
while x < stop: | |
print('Up', x) | |
await sched.sleep(1) | |
x += 1 | |
sched.new_task(countdown(5)) | |
sched.new_task(countup(20)) | |
sched.run() |
# yproducer.py | |
# | |
# Coroutine producer-consumer problem. With async-await | |
import time | |
from collections import deque | |
import heapq | |
class Awaitable: | |
def __await__(self): | |
yield | |
def switch(): | |
return Awaitable() | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() | |
self.sleeping = [ ] | |
self.current = None # Currently executing generator | |
self.sequence = 0 | |
async def sleep(self, delay): | |
deadline = time.time() + delay | |
self.sequence += 1 | |
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) | |
self.current = None # "Disappear" | |
await switch() # Switch tasks | |
def new_task(self, coro): | |
self.ready.append(coro) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
deadline, _, coro = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(coro) | |
self.current = self.ready.popleft() | |
# Drive as a generator | |
try: | |
self.current.send(None) # Send to a coroutine | |
if self.current: | |
self.ready.append(self.current) | |
except StopIteration: | |
pass | |
sched = Scheduler() # Background scheduler object | |
# ---------------- | |
class AsyncQueue: | |
def __init__(self): | |
self.items = deque() | |
self.waiting = deque() | |
async def put(self, item): | |
self.items.append(item) | |
if self.waiting: | |
sched.ready.append(self.waiting.popleft()) | |
async def get(self): | |
if not self.items: | |
self.waiting.append(sched.current) # Put myself to sleep | |
sched.current = None # "Disappear" | |
await switch() # Switch to another task | |
return self.items.popleft() | |
async def producer(q, count): | |
for n in range(count): | |
print('Producing', n) | |
await q.put(n) | |
await sched.sleep(1) | |
print('Producer done') | |
await q.put(None) # "Sentinel" to shut down | |
async def consumer(q): | |
while True: | |
item = await q.get() | |
if item is None: | |
break | |
print('Consuming', item) | |
print('Consumer done') | |
q = AsyncQueue() | |
sched.new_task(producer(q, 10)) | |
sched.new_task(consumer(q)) | |
sched.run() | |
# yproducer_error.py | |
# | |
# Example of error handling with coroutines | |
import time | |
from collections import deque | |
import heapq | |
class Awaitable: | |
def __await__(self): | |
yield | |
def switch(): | |
return Awaitable() | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() | |
self.sleeping = [ ] | |
self.current = None # Currently executing generator | |
self.sequence = 0 | |
async def sleep(self, delay): | |
deadline = time.time() + delay | |
self.sequence += 1 | |
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current)) | |
self.current = None # "Disappear" | |
await switch() # Switch tasks | |
def new_task(self, coro): | |
self.ready.append(coro) | |
def run(self): | |
while self.ready or self.sleeping: | |
if not self.ready: | |
deadline, _, coro = heapq.heappop(self.sleeping) | |
delta = deadline - time.time() | |
if delta > 0: | |
time.sleep(delta) | |
self.ready.append(coro) | |
self.current = self.ready.popleft() | |
# Drive as a generator | |
try: | |
self.current.send(None) # Send to a coroutine | |
if self.current: | |
self.ready.append(self.current) | |
except StopIteration: | |
pass | |
sched = Scheduler() # Background scheduler object | |
# ---------------- | |
class QueueClosed(Exception): | |
pass | |
class AsyncQueue: | |
def __init__(self): | |
self.items = deque() | |
self.waiting = deque() | |
self._closed = False | |
def close(self): | |
self._closed = True | |
if self.waiting and not self.items: | |
sched.ready.append(self.waiting.popleft()) # Reschedule waiting tasks | |
async def put(self, item): | |
if self._closed: | |
raise QueueClosed() | |
self.items.append(item) | |
if self.waiting: | |
sched.ready.append(self.waiting.popleft()) | |
async def get(self): | |
while not self.items: | |
if self._closed: | |
raise QueueClosed() | |
self.waiting.append(sched.current) # Put myself to sleep | |
sched.current = None # "Disappear" | |
await switch() # Switch to another task | |
return self.items.popleft() | |
async def producer(q, count): | |
for n in range(count): | |
print('Producing', n) | |
await q.put(n) | |
await sched.sleep(1) | |
print('Producer done') | |
q.close() | |
async def consumer(q): | |
try: | |
while True: | |
item = await q.get() | |
print('Consuming', item) | |
except QueueClosed: | |
print('Consumer done') | |
q = AsyncQueue() | |
sched.new_task(producer(q, 10)) | |
sched.new_task(consumer(q)) | |
sched.run() | |
Such an awesome material in a way to understanding asyncio core concepts.
Really best explanation. Lot to learn.
the problem with the code in the last example lies here :
self.tasks.append(func) ,so after the countup ,countdown,producer,consumer finishes and we start the client, the last function that enter here deadline,seq,func=heapq.heappop(self.sleeping) lets say for instance is the consumer so it will get added again despite it already finished ! so that why when we send to it it gives the error that we cannot send to already awaited,
and if we just connect to the server in the middle while them execute that instance of the client will not producer an error
solution is :instead of append the function that we poped at the end after the while loop we just append it right away
deadline,seq,func=heapq.heappop(self.sleeping)
heapq.heappush(self.sleeping, (deadline,seq,func))
note : the error will have different flavor ,weather a callback function was the last or a coroutine func was the last
awesome!
really really great.
A question regarding the part concerning TCP server/sockets: wouldn't this code benefit even further from using non-blocking sockets (socket.setblocking(False)
? And then yielding/awaiting whenever such socket is not ready and produces BlockingIOError
?
Similar to how it is done here: https://www.pythonsheets.com/appendix/python-concurrent.html
Using non-blocking I/O is something you would do in a production system. It's not used here because this was part of a course and there are only so many concepts one can absorb before one's head explodes. Fiddling around with socket options seemed like more of a distraction than helpful in this context.
Thank you!
Thank you for the great workshop!
async def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
feel we could remove async
notation from the put function because it is not using any await
I would be really hard on understanding what async is without your code and conference talks.
Thank you.
@huimin-he
Yeah, beazley don't add async keyword to put
method at first time actually.
However, it was design choice to add async
to put
(consistency of API in terms of user usage maybe) he mentioned at youtube vedioe around 1:38:00-1:40:00. It toally make sense to me.
Such an awesome material in a way to understanding asyncio core concepts.