Last active
April 4, 2018 07:08
-
-
Save thehesiod/421953b9c5d5c663872c1fa6822d5006 to your computer and use it in GitHub Desktop.
S3 Latency Limiter, parallel get_object wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import List | |
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger): | |
# noinspection PyBroadException | |
try: | |
task.result() | |
except BaseException: | |
# These may be things like boto 404s | |
pass | |
# logger.info("Ignoring exception", exc_info=sys.exc_info()) | |
async def _wait_task_cancellations(cancelled_tasks: List[asyncio.Task], logger: logging.Logger, loop: asyncio.BaseEventLoop=None, log_cancellation: bool=False, log_exceptions: bool=True): | |
if not len(cancelled_tasks): | |
return | |
# now wait for cancelled tasks to finish and log exceptions encountered | |
done_futures, _ = await asyncio.wait(cancelled_tasks, loop=loop) | |
for fut in done_futures: | |
try: | |
fut.result() | |
except asyncio.CancelledError: | |
if log_cancellation: | |
logger.exception('Task was cancelled') | |
except: | |
if log_exceptions: | |
logger.exception("Task raised unexpected exception during cancellation") | |
def relay_wrapper(func: _async_method, num_parallel: int, logger: logging.Logger): | |
""" | |
Will wrap `func` and run `num_parallel` in parallel, returning result from first | |
that completes, cancelling others. | |
:param func: async method to call | |
:param num_parallel: number of calls to run in parallel | |
:param logger: logger to use | |
:return: | |
""" | |
async def wrapper(*args, **kwargs): | |
done, pending = await asyncio.wait([func(*args, **kwargs) for _ in range(num_parallel)], return_when=asyncio.FIRST_COMPLETED) | |
# Find first non-exception result | |
# NOTE: we need to retrieve the exceptions or else we'll get errors about "Task exception was never retrieved" | |
exception_tasks: List[asyncio.Task] = list() | |
result_task: asyncio.Task = _sentinel | |
for task in done: | |
if task.cancelled() or task.exception(): | |
exception_tasks.append(task) | |
elif result_task is _sentinel: | |
result_task = task | |
# otherwise return first result | |
if result_task is _sentinel: | |
result_task = exception_tasks.pop(0) | |
try: | |
return result_task.result() | |
finally: | |
# NOTE: in waiting for the tasks to get cancelled, some may finish with an exception so we want to ignore those | |
await _wait_task_cancellations(pending, logger, None, False, False) | |
for task in exception_tasks: | |
_ignore_task_exception(task, logger) | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
We had a lot of issues trying to get our S3 latency consistently < 5s, so after guidance from Amazon we came up with this gist.
to use, do something like:
this way it will do 3 in parallel, and the fastest one wins, with the others getting cancelled if they didn't finish.