Skip to content

Instantly share code, notes, and snippets.

@theSage21
Last active May 30, 2018 09:53
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save theSage21/6aebe5f8eaee82481039f58f68f2cf2e to your computer and use it in GitHub Desktop.
High load simulator based on aiohttp
import aiohttp
import asyncio
import async_timeout
from tqdm import tqdm
TIMEOUT_LIMIT = 50
async def _get(url, json, pbar):
async with aiohttp.ClientSession() as session:
for _ in range(100):
try:
with async_timeout.timeout(TIMEOUT_LIMIT):
async with session.post(url, json=json) as RESP:
await RESP.read()
pbar.update(1)
except Exception as err:
pass
async def rush(url, json, pbar):
while True:
await _get(url, json, pbar)
def run(url, json, n_parallel):
with tqdm() as pbar:
tasks = [rush(url, json, pbar)
for task_id in range(n_parallel)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
if __name__ == '__main__':
import argparse
import json
parser = argparse.ArgumentParser()
parser.add_argument('url')
parser.add_argument('json')
parser.add_argument('-n', '--n_parallel', default=150, action='store')
args = parser.parse_args()
run(args.url, json.loads(args.json), int(args.n_parallel))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment