Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created January 10, 2017 15:32
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save vxgmichel/63493a6cd0831ca4cc84430ef1c61a0e to your computer and use it in GitHub Desktop.
Save vxgmichel/63493a6cd0831ca4cc84430ef1c61a0e to your computer and use it in GitHub Desktop.
Command line interface for monitoring asyncio tasks
"""Command line interface for monitoring asyncio tasks."""
import os
import signal
import asyncio
import argparse
import traceback
import linecache
from itertools import count
from functools import partial
from contextlib import contextmanager, suppress
from threading import Thread
from concurrent.futures import Executor
from aioconsole import AsynchronousCli, interact
from aioconsole import start_interactive_server
from textwrap import wrap
from terminaltables import AsciiTable
# Utils
def _get_stack(task):
frames = []
coro = task._coro
while coro:
if hasattr(coro, 'cr_frame') or hasattr(coro, 'gi_frame'):
f = coro.cr_frame if hasattr(coro, 'cr_frame') else coro.gi_frame
else:
f = None
if f is not None:
frames.append(f)
if hasattr(coro, 'cr_await') or hasattr(coro, 'gi_yieldfrom'):
coro = (coro.cr_await if hasattr(coro, 'cr_await')
else coro.gi_yieldfrom)
else:
coro = None
return frames
def _format_stack(task):
extracted_list = []
checked = set()
for f in _get_stack(task):
lineno = f.f_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
if filename not in checked:
checked.add(filename)
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
extracted_list.append((filename, lineno, name, line))
if not extracted_list:
resp = 'No stack for %r' % task
else:
resp = 'Stack for %r (most recent call last):\n' % task
resp += ''.join(traceback.format_list(extracted_list))
return resp
def task_by_id(taskid, loop):
tasks = asyncio.Task.all_tasks(loop=loop)
return next(filter(lambda t: id(t) == taskid, tasks), None)
async def cancel_task(task):
with suppress(asyncio.CancelledError):
task.cancel()
await task
class Proxy:
def __init__(self, obj, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._proxy_obj = obj
self._proxy_loop = loop
def __getattr__(self, name):
obj = getattr(self._proxy_obj, name)
if asyncio.iscoroutinefunction(obj):
return self._proxy_wrap(obj)
return obj
def _proxy_wrap(self, corofn):
async def wrapper(*args, **kwargs):
coro = corofn(*args, **kwargs)
future = asyncio.run_coroutine_threadsafe(coro, self._proxy_loop)
return await asyncio.wrap_future(future)
return wrapper
class AsyncioExecutor(Executor):
def __init__(self):
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._target)
self._thread.start()
def _target(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def submit(self, fn, *args, **kwargs):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def shutdown(self, wait=True):
self._loop.call_soon_threadsafe(self._loop.stop)
if wait:
self._thread.join()
# Commands
async def command_ps(reader, writer, loop=None):
headers = ('Task ID', 'State', 'Task')
table_data = [headers]
for task in sorted(asyncio.Task.all_tasks(loop=loop), key=id):
taskid = id(task)
if task:
t = '\n'.join(wrap(str(task), 80))
table_data.append((taskid, task._state, t))
table = AsciiTable(table_data)
writer.write(table.table.encode())
writer.write(b'\n')
async def command_where(reader, writer, taskid, loop=None):
task = task_by_id(taskid, loop)
write = lambda arg: writer.write(arg.encode())
if task:
write(_format_stack(task))
write('\n')
else:
write('No task %d\n' % taskid)
async def command_signal(reader, writer, signame, loop=None):
if hasattr(signal, signame):
os.kill(os.getpid(), getattr(signal, signame))
else:
msg = 'Unknown signal {}\n'.format(signame)
writer.write(msg.encode())
async def command_cancel(reader, writer, taskid, loop=None):
task = task_by_id(taskid, loop)
write = lambda arg: writer.write(arg.encode())
if task:
fut = asyncio.run_coroutine_threadsafe(
cancel_task(task), loop=loop)
await asyncio.wrap_future(fut)
write('Cancel task %d\n' % taskid)
else:
write('No task %d\n' % taskid)
async def command_exit(reader, writer, loop=None):
writer.write('Leaving monitor. Hit Ctrl-C to exit\n')
writer.flush()
async def command_console(reader, writer, loop=None):
streams = Proxy(reader), Proxy(writer)
coro = interact(streams=streams, stop=False)
future = asyncio.run_coroutine_threadsafe(coro, loop)
await asyncio.wrap_future(future)
# Monitoring
def make_cli(streams=None, loop=None):
ps_parser = argparse.ArgumentParser(
description="Show task table")
where_parser = argparse.ArgumentParser(
description="Show stack frames for a stack")
where_parser.add_argument(
'taskid', metavar='TID', type=int, help='task identifier')
cancel_parser = argparse.ArgumentParser(
description="Cancel an indicated task")
cancel_parser.add_argument(
'taskid', metavar='TID', type=int, help='task identifier')
signal_parser = argparse.ArgumentParser(
description="Send a Unix signal")
signal_parser.add_argument(
'signame', metavar='SIG', type=int, help='signal name')
console_parser = argparse.ArgumentParser(
description="Switch to async python console.")
commands = {
'ps': (partial(command_ps, loop=loop), ps_parser),
'where': (partial(command_where, loop=loop), where_parser),
'cancel': (partial(command_cancel, loop=loop), cancel_parser),
'signal': (partial(command_signal, loop=loop), signal_parser),
'console': (partial(command_console, loop=loop), console_parser)}
return AsynchronousCli(commands, streams, prog='monitor')
@contextmanager
def asyncio_monitor(host=None, port=8888, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
handler = partial(make_cli, loop=loop)
with AsyncioExecutor() as executor:
future = executor.submit(start_interactive_server, handler, host, port)
server = future.result()
yield
executor.submit(asyncio.coroutine(server.close)).result()
future = executor.submit(server.wait_closed)
future.result()
if __name__ == '__main__':
async def main():
for x in count():
print(await asyncio.sleep(1, result=x))
async def background():
for x in count(0, -1):
print(await asyncio.sleep(1, result=x))
with asyncio_monitor():
loop = asyncio.get_event_loop()
bgtask = loop.create_task(background())
loop.run_until_complete(main())
bgtask.cancel()
loop.close()
@AviKKi
Copy link

AviKKi commented Apr 5, 2018

Really helpful gist, that's exactly what I was looking for

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment