Skip to content

Instantly share code, notes, and snippets.

@igorcoding
Last active October 11, 2017 13:10
Show Gist options
  • Save igorcoding/41c4b8b8336723979a7ad91fa890ef3d to your computer and use it in GitHub Desktop.
Save igorcoding/41c4b8b8336723979a7ad91fa890ef3d to your computer and use it in GitHub Desktop.
import asyncio
import collections
import time
import uvloop; asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
def chunkify(lst, n):
s = len(lst) // n
o = 0
for i in range(n):
if i == n - 1:
yield lst[o:]
else:
yield lst[o:o+s]
o += s
async def exec_for_rps(name, f, threads, rps, monitor_map, loop, args):
rps = int(rps)
if rps == 0:
return
while True:
coros = [f(monitor_map, name, *args) for _ in range(rps)]
thread_coros = []
for chunk in chunkify(coros, threads):
th_coro = asyncio.wait(chunk, loop=loop)
thread_coros.append(asyncio.ensure_future(th_coro, loop=loop))
asyncio.ensure_future(asyncio.wait(thread_coros, loop=loop), loop=loop)
await asyncio.sleep(1, loop=loop)
async def monitor(m, loop):
prev_m = m.copy()
for k in prev_m:
prev_m[k] = 0
prev_time = time.time()
while True:
current_time = time.time()
elapsed_time = current_time - prev_time
rps_map = {}
for k, current_count in m.items():
prev_count = prev_m.get(k, 0)
rps = (current_count - prev_count) / elapsed_time
rps_map[k] = rps
s = ' '.join([f'{key} rps: {rps:.2f}.' for key, rps in
rps_map.items()])
print(s)
prev_m = m.copy()
prev_time = current_time
await asyncio.sleep(1, loop=loop)
async def target_func(m, name):
await asyncio.sleep(0.1)
m[name] += 1
async def main(loop):
monitor_map = collections.defaultdict(int)
threads = 2
rps = 800
asyncio.ensure_future(
exec_for_rps('target',
target_func, threads, rps,
monitor_map, loop, []),
loop=loop
)
await monitor(monitor_map, loop)
if __name__ == '__main__':
_loop = asyncio.get_event_loop()
_loop.run_until_complete(main(_loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment