Skip to content

Instantly share code, notes, and snippets.

@cdunklau
Last active October 19, 2017 23:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cdunklau/58b444dcde0b8e1899b0475a49e5a538 to your computer and use it in GitHub Desktop.
Save cdunklau/58b444dcde0b8e1899b0475a49e5a538 to your computer and use it in GitHub Desktop.
Constrain number of simultanous HTTP requests with asyncio
import asyncio
import itertools
import aiohttp
import async_timeout
async def fetch_with_response_delay(session, delay):
if not 0 <= delay <= 10:
raise ValueError('Delay must be between 0 and 10 inclusive')
url = 'http://httpbin.org/delay/{0}'.format(delay)
with async_timeout.timeout(15):
async with session.get(url) as response:
return await response.json()
class CoroutineLimiter:
"""
Inspired by twisted.internet.defer.DeferredSemaphore
If `invoke_as_tasks` is true, wrap the invoked coroutines in Task
objects. This will ensure ensure that the coroutines happen in the
same order `.invoke()` was called, if the tasks are given
to `asyncio.gather`.
"""
def __init__(self, limit, *, loop=None, invoke_as_tasks=False):
if limit <= 0:
raise ValueError('Limit must be nonzero and positive')
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._sem = asyncio.Semaphore(limit, loop=loop)
self._count = itertools.count(1)
self._invoke_as_tasks = invoke_as_tasks
def invoke(self, coro_callable, *args):
coro = self._invoke(coro_callable, *args)
if self._invoke_as_tasks:
return self._loop.create_task(coro)
else:
return coro
async def _invoke(self, coro_callable, *args):
n = next(self._count)
fmt = 'Acquiring semaphore for coroutine {count} with args {args}'
print(fmt.format(count=n, args=args))
await self._sem.acquire()
fmt = 'Semaphore acquired. Invoking coroutine {count} with args {args}'
print(fmt.format(count=n, args=args))
try:
return await coro_callable(*args)
finally:
print('Coroutine {count} finished, releasing semaphore'.format(
count=n,
))
self._sem.release()
async def run(loop):
delays = [1, 2, 3, 4, 5, 6, 7, 8, 9]
limiter = CoroutineLimiter(4, loop=loop, invoke_as_tasks=True)
async with aiohttp.ClientSession(loop=loop) as session:
requests = [
limiter.invoke(fetch_with_response_delay, session, n)
for n in delays
]
results = await asyncio.gather(*requests, loop=loop)
print('finished! results: {0}'.format(results))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
if __name__ == '__main__':
main()
import asyncio
import collections
import aiohttp
import async_timeout
async def fetch_with_response_delay(session, delay):
if not 0 <= delay <= 10:
raise ValueError('Delay must be between 0 and 10 inclusive')
url = 'http://httpbin.org/delay/{0}'.format(delay)
with async_timeout.timeout(15):
async with session.get(url) as response:
return await response.json()
async def simple_limiter(futures, *, limit):
remaining = collections.deque(futures)
pending = {remaining.popleft() for _ in range(limit)}
while True:
print('Will wait on {n} futures from total of {tot}'.format(
n=len(pending), tot=len(pending)+len(remaining)))
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED)
print('Got {nd} done and {np} pending, total left {tot}'.format(
nd=len(done), np=len(pending), tot=len(pending)+len(remaining)))
if not pending and not remaining:
break
tograb = min(len(remaining), limit - len(pending))
for _ in range(tograb):
pending.add(remaining.popleft())
async def run(loop):
delays = [1, 2, 3, 4, 5, 6, 7, 8, 9]
async with aiohttp.ClientSession(loop=loop) as session:
requests = [
fetch_with_response_delay(session, n)
for n in delays
]
print('starting...')
results = await simple_limiter(requests, limit=4)
print('finished! results: {0}'.format(results))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment