Created
May 27, 2018 10:51
-
-
Save gmbnomis/11f5253aeb80539b20aca64540b12d20 to your computer and use it in GitHub Desktop.
Async pipelines stages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
import time | |
from contextlib import suppress | |
start_time = time.time() | |
def print_t(msg): | |
t = time.time() - start_time | |
print(f"{t:04.1f}: {msg}") | |
class Content: | |
def __init__(self, number): | |
self.number = number | |
self.stage = 0 | |
def __str__(self): | |
return f"Content {self.number}, stage {self.stage}" | |
async def do_work(self): | |
"""Work on the content item | |
Usually this coroutine will become a task. | |
""" | |
try: | |
await asyncio.sleep(0.5) | |
except asyncio.CancelledError: | |
print_t(f"{self}: got cancelled while working!") | |
raise | |
self.stage += 1 | |
print_t(f"{self} done") | |
return self | |
# Stages using asynchronous generators | |
# A stage is an asynchronous generator that gets an asynchronous | |
# iterator as an input. The input iterator may be None (first stage). | |
# | |
# All stages do the same in this example: Call do_work() for all Content | |
# instances. | |
async def stage_gen(_input_aiter, number = 10): | |
"""First stage: Yield a new Content object every 0.1 seconds (no work done).""" | |
for i in range(number): | |
await asyncio.sleep(0.1) | |
yield Content(i) | |
async def stage_concurrent(input_aiter, max_concurrent = 2, max_backlog = 2): | |
"""Stage that runs do_work() concurrently (with limited concurrency).""" | |
# Getting the next item of the input iterator actually is just one of the | |
# futures we schedule. | |
next_future = asyncio.ensure_future(input_aiter.__anext__()) | |
pending = {next_future} | |
backlog = set() | |
content_task_count = 0 | |
saturated = False | |
while pending: | |
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
for future in done: | |
if future is next_future: | |
assert not saturated | |
# The input iterator yielded a new Content object. Let's see | |
# whether we can work on it (alternatively, put it in the backlog) | |
try: | |
content = future.result() | |
except StopAsyncIteration: | |
# input iterator is depleted. Don't re-schedule | |
# __anext__() anymore in this case | |
next_future = None | |
continue | |
if content_task_count < max_concurrent: | |
content_task_count += 1 | |
pending.add(content.do_work()) | |
print_t(f"stage_concurrent: Adding '{content}' to pending Futures ({content_task_count} tasks)") | |
else: | |
print_t(f"stage_concurrent: Put '{content}' to backlog") | |
backlog.add(content) | |
# Don't forget to re-schedule '__anext__()' Future again: | |
if len(backlog) < max_backlog: | |
next_future = asyncio.ensure_future(input_aiter.__anext__()) | |
pending.add(next_future) | |
else: | |
print_t(f"stage_concurrent: Stop getting new upstream items") | |
saturated = True | |
next_future = None | |
else: | |
# One of our content workers has completed | |
yield future.result() | |
content_task_count -= 1 | |
# Let's see whether we can add work from the backlog | |
if backlog and content_task_count < max_concurrent: | |
res = backlog.pop() | |
content_task_count += 1 | |
pending.add(res.do_work()) | |
print_t(f"stage_concurrent: Move '{res}' from backlog to pending Futures ({content_task_count} tasks)") | |
if saturated and len(backlog) < max_backlog: | |
print_t("stage_concurrent: Start getting new upstream items again") | |
saturated = False | |
next_future = asyncio.ensure_future(input_aiter.__anext__()) | |
pending.add(next_future) | |
async def stage_sequential(input_aiter): | |
"""Stage that works sequentially: one worker at a time""" | |
async for content in input_aiter: | |
print_t(f"stage_sequential: begin work on {content}") | |
yield await content.do_work() | |
async def stage_all(input_aiter): | |
"""Stage that gathers all Content from the input_generator first before working sequentially on it""" | |
all_content = [content async for content in input_aiter] | |
for content in all_content: | |
print_t(f"stage_all: begin work on {content}") | |
yield await content.do_work() | |
async def run_stages(stages): | |
"""Runs a list if generator stages""" | |
async for c in functools.reduce(lambda x, y: y(x), stages, None): | |
print_t(f"Final result: {c}") | |
# Stages using queues (producer/consumer pattern) | |
# A stage gets an input and an output queue. (In this example, | |
# stages are simply async functions taking two queue parameters) | |
# | |
# All stages do the same in this example: Call do_work() for all Content | |
# instances from the input queue and deliver them to the output queue. | |
async def queue_stage_gen(_in_q, out_q, number = 10): | |
"""First stage: Yield a new Content object every 0.1 seconds. (no work done)""" | |
for i in range(number): | |
await asyncio.sleep(0.1) | |
c = Content(i) | |
await out_q.put(c) | |
print_t(f"queue_stage_gen: queued {c}") | |
await out_q.put(None) # None signals that there are no more content items | |
async def queue_stage_sequential(in_q, out_q): | |
"""Stage that works sequentially one at a time""" | |
while True: | |
content = await in_q.get() | |
if content is None: | |
# the producer emits None to indicate that it is done | |
break | |
await out_q.put(await content.do_work()) | |
await out_q.put(None) | |
async def queue_stage_all(in_q, out_q): | |
"""Stage that gathers all Content from the input_queue first before working sequentially on it""" | |
all_content = set() | |
while True: | |
content = await in_q.get() | |
if content is None: | |
# the producer emits None to indicate that it is done | |
break | |
all_content.add(content) | |
for content in all_content: | |
print_t(f"queue_stage_all: begin work on {content}") | |
await out_q.put(await content.do_work()) | |
await out_q.put(None) | |
class QueueStageConcurrent: | |
"""Stage that runs do_work() concurrently (with limited concurrency).""" | |
def __init__(self, max_concurrent=2): | |
self.max_concurrent = max_concurrent | |
async def run(self, in_q, out_q): | |
self._pending = set() | |
self._schedule_next_future(in_q) | |
saturated = False | |
try: | |
while self._pending: | |
done, self._pending = await asyncio.wait(self._pending, return_when=asyncio.FIRST_COMPLETED) | |
for future in done: | |
if future is self._next_future: | |
# The input queue yielded a new Content object. | |
assert not saturated | |
content = future.result() | |
if content: | |
self._schedule_work(content) | |
if len(self._pending) < self.max_concurrent: | |
self._schedule_next_future(in_q) | |
else: | |
saturated = True | |
self._next_future = None | |
print_t("queue_stage_concurrent: Stop getting new upstream items") | |
else: | |
# input stream depleted | |
continue | |
else: | |
# One of our workers has finished | |
await out_q.put(future.result()) | |
if saturated: | |
self._schedule_next_future(in_q) | |
print_t("queue_stage_concurrent: Start getting new upstream items again") | |
saturated = False | |
await out_q.put(None) | |
except asyncio.CancelledError: | |
# asyncio.wait does not cancel its tasks when cancelled, we need to do this | |
self._cancel_pending() | |
raise | |
def _schedule_next_future(self, in_q): | |
"""Schedule getting the next item from the in queue in_q.""" | |
self._next_future = asyncio.ensure_future(in_q.get()) | |
self._pending.add(self._next_future) | |
def _schedule_work(self, content): | |
print_t(f"queue_stage_concurrent: Adding '{content}' to pending Futures") | |
future = asyncio.ensure_future(content.do_work()) | |
self._pending.add(future) | |
def _cancel_pending(self): | |
for future in self._pending: | |
future.cancel() | |
async def queue_sink(in_q, out_q, expected_number = 10): | |
"""Stage that acts as a sink (no work done)""" | |
number = 0 | |
while True: | |
content = await in_q.get() | |
if content is None: | |
break | |
number += 1 | |
print_t(f"queue_sink: {content}") | |
assert number == expected_number | |
await out_q.put(None) | |
async def queue_run_stages(stages): | |
in_q = None | |
futures = [] | |
for stage in stages: | |
out_q = asyncio.Queue(maxsize=2) | |
futures.append(stage(in_q, out_q)) | |
in_q = out_q | |
await asyncio.gather(*futures) | |
print(""" | |
Async Iterator Stages | |
--------------------- | |
""") | |
loop = asyncio.get_event_loop() | |
start_time = time.time() | |
loop.run_until_complete(run_stages([stage_gen, stage_concurrent, stage_all])) | |
print(""" | |
asyncio.Queue Stages | |
-------------------- | |
""") | |
start_time = time.time() | |
concurrent = QueueStageConcurrent() | |
loop.run_until_complete(queue_run_stages([queue_stage_gen, concurrent.run, queue_stage_all, queue_sink])) | |
print(""" | |
Cancel asyncio.Queue stages after 2 seconds | |
------------------------------------------- | |
""") | |
async def timed_cancel_task(future, timeout = 2.0): | |
await asyncio.sleep(timeout) | |
future.cancel() | |
start_time = time.time() | |
concurrent = QueueStageConcurrent() | |
run_stages_future = asyncio.ensure_future(queue_run_stages([queue_stage_gen, concurrent.run, queue_stage_all, queue_sink])) | |
with suppress(asyncio.CancelledError): | |
loop.run_until_complete(asyncio.gather(run_stages_future, timed_cancel_task(run_stages_future))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment