Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active August 28, 2023 18:22
Show Gist options
  • Save vxgmichel/2317738886c75e9626d92c326196789f to your computer and use it in GitHub Desktop.
Save vxgmichel/2317738886c75e9626d92c326196789f to your computer and use it in GitHub Desktop.
Trio instrument to detect and log blocking tasks
import time
import inspect
import traceback
import trio
import structlog
logger = structlog.get_logger()
def format_stack(coro):
if hasattr(coro, "cr_code"):
module = inspect.getmodule(coro.cr_code)
if module.__name__.startswith("trio."):
return
if hasattr(coro, "cr_frame"):
yield from traceback.format_stack(coro.cr_frame)
if hasattr(coro, "cr_await"):
yield from format_stack(coro.cr_await)
class TaskMonitoringInstrument(trio.abc.Instrument):
def __init__(self, threshold=0.05):
self.start_dict = {}
self.threshold = threshold
def before_task_step(self, task):
self.start_dict[task] = time.time()
def after_task_step(self, task):
if task not in self.start_dict:
return
delta = time.time() - self.start_dict.pop(task)
if delta <= self.threshold:
return
if task.coro.cr_frame is None:
msg = f"The last step for task `{task.name}` took {delta:.3f} s"
else:
msg = f"The previous step for task `{task.name}` took {delta:.3f} s"
msg += "\nResuming afer:\n"
msg += "".join(format_stack(task.coro)).rstrip()
logger.warning(msg)
# Testing
async def task1():
await trio.sleep(0.9)
time.sleep(0.1)
async def task2():
await subtask2()
async def subtask2():
await subsubtask2()
async def subsubtask2():
await trio.sleep(0.7)
time.sleep(0.2)
await trio.sleep(0.1)
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(task1)
nursery.start_soon(task2)
if __name__ == "__main__":
instruments = [TaskMonitoringInstrument()]
trio.run(main, instruments=instruments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment