Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Last active April 4, 2018 07:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/421953b9c5d5c663872c1fa6822d5006 to your computer and use it in GitHub Desktop.
Save thehesiod/421953b9c5d5c663872c1fa6822d5006 to your computer and use it in GitHub Desktop.
S3 Latency Limiter, parallel get_object wrapper
import asyncio
import logging
from typing import List
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger):
# noinspection PyBroadException
try:
task.result()
except BaseException:
# These may be things like boto 404s
pass
# logger.info("Ignoring exception", exc_info=sys.exc_info())
async def _wait_task_cancellations(cancelled_tasks: List[asyncio.Task], logger: logging.Logger, loop: asyncio.BaseEventLoop=None, log_cancellation: bool=False, log_exceptions: bool=True):
if not len(cancelled_tasks):
return
# now wait for cancelled tasks to finish and log exceptions encountered
done_futures, _ = await asyncio.wait(cancelled_tasks, loop=loop)
for fut in done_futures:
try:
fut.result()
except asyncio.CancelledError:
if log_cancellation:
logger.exception('Task was cancelled')
except:
if log_exceptions:
logger.exception("Task raised unexpected exception during cancellation")
def relay_wrapper(func: _async_method, num_parallel: int, logger: logging.Logger):
"""
Will wrap `func` and run `num_parallel` in parallel, returning result from first
that completes, cancelling others.
:param func: async method to call
:param num_parallel: number of calls to run in parallel
:param logger: logger to use
:return:
"""
async def wrapper(*args, **kwargs):
done, pending = await asyncio.wait([func(*args, **kwargs) for _ in range(num_parallel)], return_when=asyncio.FIRST_COMPLETED)
# Find first non-exception result
# NOTE: we need to retrieve the exceptions or else we'll get errors about "Task exception was never retrieved"
exception_tasks: List[asyncio.Task] = list()
result_task: asyncio.Task = _sentinel
for task in done:
if task.cancelled() or task.exception():
exception_tasks.append(task)
elif result_task is _sentinel:
result_task = task
# otherwise return first result
if result_task is _sentinel:
result_task = exception_tasks.pop(0)
try:
return result_task.result()
finally:
# NOTE: in waiting for the tasks to get cancelled, some may finish with an exception so we want to ignore those
await _wait_task_cancellations(pending, logger, None, False, False)
for task in exception_tasks:
_ignore_task_exception(task, logger)
return wrapper
@thehesiod
Copy link
Author

We had a lot of issues trying to get our S3 latency consistently < 5s, so after guidance from Amazon we came up with this gist.

to use, do something like:

import aiobotocore
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
session = aiobotocore.get_session()
async with session.create_client('s3') as s3_client:
    get_object = relay_wrapper(s3_client.get_object, 3, logger)
    response = await get_object(Bucket='bucket', key='key')

this way it will do 3 in parallel, and the fastest one wins, with the others getting cancelled if they didn't finish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment