Skip to content

Instantly share code, notes, and snippets.

@gmbnomis
Created May 27, 2018 10:51
Show Gist options
  • Save gmbnomis/11f5253aeb80539b20aca64540b12d20 to your computer and use it in GitHub Desktop.
Save gmbnomis/11f5253aeb80539b20aca64540b12d20 to your computer and use it in GitHub Desktop.
Async pipelines stages
import asyncio
import functools
import time
from contextlib import suppress
start_time = time.time()
def print_t(msg):
t = time.time() - start_time
print(f"{t:04.1f}: {msg}")
class Content:
def __init__(self, number):
self.number = number
self.stage = 0
def __str__(self):
return f"Content {self.number}, stage {self.stage}"
async def do_work(self):
"""Work on the content item
Usually this coroutine will become a task.
"""
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print_t(f"{self}: got cancelled while working!")
raise
self.stage += 1
print_t(f"{self} done")
return self
# Stages using asynchronous generators
# A stage is an asynchronous generator that gets an asynchronous
# iterator as an input. The input iterator may be None (first stage).
#
# All stages do the same in this example: Call do_work() for all Content
# instances.
async def stage_gen(_input_aiter, number = 10):
"""First stage: Yield a new Content object every 0.1 seconds (no work done)."""
for i in range(number):
await asyncio.sleep(0.1)
yield Content(i)
async def stage_concurrent(input_aiter, max_concurrent = 2, max_backlog = 2):
"""Stage that runs do_work() concurrently (with limited concurrency)."""
# Getting the next item of the input iterator actually is just one of the
# futures we schedule.
next_future = asyncio.ensure_future(input_aiter.__anext__())
pending = {next_future}
backlog = set()
content_task_count = 0
saturated = False
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for future in done:
if future is next_future:
assert not saturated
# The input iterator yielded a new Content object. Let's see
# whether we can work on it (alternatively, put it in the backlog)
try:
content = future.result()
except StopAsyncIteration:
# input iterator is depleted. Don't re-schedule
# __anext__() anymore in this case
next_future = None
continue
if content_task_count < max_concurrent:
content_task_count += 1
pending.add(content.do_work())
print_t(f"stage_concurrent: Adding '{content}' to pending Futures ({content_task_count} tasks)")
else:
print_t(f"stage_concurrent: Put '{content}' to backlog")
backlog.add(content)
# Don't forget to re-schedule '__anext__()' Future again:
if len(backlog) < max_backlog:
next_future = asyncio.ensure_future(input_aiter.__anext__())
pending.add(next_future)
else:
print_t(f"stage_concurrent: Stop getting new upstream items")
saturated = True
next_future = None
else:
# One of our content workers has completed
yield future.result()
content_task_count -= 1
# Let's see whether we can add work from the backlog
if backlog and content_task_count < max_concurrent:
res = backlog.pop()
content_task_count += 1
pending.add(res.do_work())
print_t(f"stage_concurrent: Move '{res}' from backlog to pending Futures ({content_task_count} tasks)")
if saturated and len(backlog) < max_backlog:
print_t("stage_concurrent: Start getting new upstream items again")
saturated = False
next_future = asyncio.ensure_future(input_aiter.__anext__())
pending.add(next_future)
async def stage_sequential(input_aiter):
"""Stage that works sequentially: one worker at a time"""
async for content in input_aiter:
print_t(f"stage_sequential: begin work on {content}")
yield await content.do_work()
async def stage_all(input_aiter):
"""Stage that gathers all Content from the input_generator first before working sequentially on it"""
all_content = [content async for content in input_aiter]
for content in all_content:
print_t(f"stage_all: begin work on {content}")
yield await content.do_work()
async def run_stages(stages):
"""Runs a list if generator stages"""
async for c in functools.reduce(lambda x, y: y(x), stages, None):
print_t(f"Final result: {c}")
# Stages using queues (producer/consumer pattern)
# A stage gets an input and an output queue. (In this example,
# stages are simply async functions taking two queue parameters)
#
# All stages do the same in this example: Call do_work() for all Content
# instances from the input queue and deliver them to the output queue.
async def queue_stage_gen(_in_q, out_q, number = 10):
"""First stage: Yield a new Content object every 0.1 seconds. (no work done)"""
for i in range(number):
await asyncio.sleep(0.1)
c = Content(i)
await out_q.put(c)
print_t(f"queue_stage_gen: queued {c}")
await out_q.put(None) # None signals that there are no more content items
async def queue_stage_sequential(in_q, out_q):
"""Stage that works sequentially one at a time"""
while True:
content = await in_q.get()
if content is None:
# the producer emits None to indicate that it is done
break
await out_q.put(await content.do_work())
await out_q.put(None)
async def queue_stage_all(in_q, out_q):
"""Stage that gathers all Content from the input_queue first before working sequentially on it"""
all_content = set()
while True:
content = await in_q.get()
if content is None:
# the producer emits None to indicate that it is done
break
all_content.add(content)
for content in all_content:
print_t(f"queue_stage_all: begin work on {content}")
await out_q.put(await content.do_work())
await out_q.put(None)
class QueueStageConcurrent:
"""Stage that runs do_work() concurrently (with limited concurrency)."""
def __init__(self, max_concurrent=2):
self.max_concurrent = max_concurrent
async def run(self, in_q, out_q):
self._pending = set()
self._schedule_next_future(in_q)
saturated = False
try:
while self._pending:
done, self._pending = await asyncio.wait(self._pending, return_when=asyncio.FIRST_COMPLETED)
for future in done:
if future is self._next_future:
# The input queue yielded a new Content object.
assert not saturated
content = future.result()
if content:
self._schedule_work(content)
if len(self._pending) < self.max_concurrent:
self._schedule_next_future(in_q)
else:
saturated = True
self._next_future = None
print_t("queue_stage_concurrent: Stop getting new upstream items")
else:
# input stream depleted
continue
else:
# One of our workers has finished
await out_q.put(future.result())
if saturated:
self._schedule_next_future(in_q)
print_t("queue_stage_concurrent: Start getting new upstream items again")
saturated = False
await out_q.put(None)
except asyncio.CancelledError:
# asyncio.wait does not cancel its tasks when cancelled, we need to do this
self._cancel_pending()
raise
def _schedule_next_future(self, in_q):
"""Schedule getting the next item from the in queue in_q."""
self._next_future = asyncio.ensure_future(in_q.get())
self._pending.add(self._next_future)
def _schedule_work(self, content):
print_t(f"queue_stage_concurrent: Adding '{content}' to pending Futures")
future = asyncio.ensure_future(content.do_work())
self._pending.add(future)
def _cancel_pending(self):
for future in self._pending:
future.cancel()
async def queue_sink(in_q, out_q, expected_number = 10):
"""Stage that acts as a sink (no work done)"""
number = 0
while True:
content = await in_q.get()
if content is None:
break
number += 1
print_t(f"queue_sink: {content}")
assert number == expected_number
await out_q.put(None)
async def queue_run_stages(stages):
in_q = None
futures = []
for stage in stages:
out_q = asyncio.Queue(maxsize=2)
futures.append(stage(in_q, out_q))
in_q = out_q
await asyncio.gather(*futures)
print("""
Async Iterator Stages
---------------------
""")
loop = asyncio.get_event_loop()
start_time = time.time()
loop.run_until_complete(run_stages([stage_gen, stage_concurrent, stage_all]))
print("""
asyncio.Queue Stages
--------------------
""")
start_time = time.time()
concurrent = QueueStageConcurrent()
loop.run_until_complete(queue_run_stages([queue_stage_gen, concurrent.run, queue_stage_all, queue_sink]))
print("""
Cancel asyncio.Queue stages after 2 seconds
-------------------------------------------
""")
async def timed_cancel_task(future, timeout = 2.0):
await asyncio.sleep(timeout)
future.cancel()
start_time = time.time()
concurrent = QueueStageConcurrent()
run_stages_future = asyncio.ensure_future(queue_run_stages([queue_stage_gen, concurrent.run, queue_stage_all, queue_sink]))
with suppress(asyncio.CancelledError):
loop.run_until_complete(asyncio.gather(run_stages_future, timed_cancel_task(run_stages_future)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment