Skip to content

Instantly share code, notes, and snippets.

@btoueg
Last active April 20, 2018 08:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save btoueg/a0ac11ee5e4bbf040afb4ec84aa04fd3 to your computer and use it in GitHub Desktop.
Save btoueg/a0ac11ee5e4bbf040afb4ec84aa04fd3 to your computer and use it in GitHub Desktop.
Demonstration of throttling aiohttp with semaphore

I'm trying to achieve fine-grained throttling with aiohttp.

By fine-grained, I mean that controlling how many requests are simultaneously executed is not enough, I want to rate-limit the number of requests to match 100 requests per 10 seconds interval.

The interesting part resides in throttled_coroutine which acquires/releases a lock at a speed which is exactly the rate-limit I want to enforce by either:

  • waiting a delay once requests performing very quickly are completed
  • releasing lock before completion of requests performing poorly

At the end of a batch of requests, last coroutines tend to wait for no reason. This behaviour amplifies when timeout increases.

How should I deal with it? (see FIXME) I've considered looking for semaphore._waiters, but it seems like a bad choice because having no waiting at a particular point in times does not guarantee we are at the end of the batch.

Other than that, I'd be glad to have some feedback regarding my try/excecpt block and the lack of with statement in throttled_coroutine

async def fetch(session, url):
async with session.get(url, params={
'key': API_KEY,
'token': API_TOKEN,
}) as response:
return await response.json()
async def throttled_coroutine(coro, sem, timeout=10):
# Trello API: no more than 100 requests (sem) per 10 seconds interval (timeout)
await sem.acquire()
semaphore_has_been_released = False
fut = asyncio.ensure_future(coro)
start = time.time()
try:
await asyncio.wait_for(asyncio.shield(fut), timeout)
except asyncio.TimeoutError:
print('Manualy releasing lock')
sem.release()
result = await asyncio.wait_for(fut, timeout=None)
# waiting further if necessary
if len(sem._waiters) > 0:
time_elapsed = time.time() - start
wait_time = max(timeout - time_elapsed, 0)
await asyncio.sleep(wait_time)
if not semaphore_has_been_released:
sem.release()
return result
async def fetchAll():
cards = board['cards'][:199] # list of Trello Cards
sem = asyncio.BoundedSemaphore(100)
async with aiohttp.ClientSession() as session:
coroutines = []
start = time.time()
for card in cards:
coroutine = throttled_coroutine(fetch(session, 'https://api.trello.com/1/card/{id}/actions/'.format(**card)), sem)
coroutines.append(coroutine)
tasks = [asyncio.ensure_future(coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
time_elapsed = time.time() - start
print(time_elapsed)
from pprint import pprint
pprint({card['id']: (len(comments) if isinstance(comments, list) else comments) for card, comments in zip(cards, results)})
loop = asyncio.get_event_loop()
loop.run_until_complete(fetchAll())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment