Skip to content

Instantly share code, notes, and snippets.

@jponf
Last active September 8, 2021 08:36
Show Gist options
  • Save jponf/cb8efe2e5ae6da86ee47f200cc13b7d5 to your computer and use it in GitHub Desktop.
Save jponf/cb8efe2e5ae6da86ee47f200cc13b7d5 to your computer and use it in GitHub Desktop.
Python stacktracer
# -*- coding: utf-8 -*-
"""Stack tracer for multi-threaded applications.
This code is a copy of https://pypi.python.org/pypi/stacktracer/0.1.2
with minor modifications.
Usage:
import stacktracer
stacktracer.trace_start("trace.html", interval=5, auto=True)
# Set auto flag to always update file!
....
stacktracer.trace_stop()
"""
# pylint: disable=protected-access
import os
import sys
import time
import threading
import traceback
try:
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
HAS_PYGMENTS = True
except ImportError:
HAS_PYGMENTS = False
def get_stacktraces():
"""Get the stack traces from each thread."""
id_to_name = {}
for thread in threading.enumerate():
id_to_name[thread.ident] = thread.name
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread {}, ID {}"
.format(id_to_name[thread_id], thread_id))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format(filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
if HAS_PYGMENTS:
return highlight("\n".join(code),
PythonLexer(),
HtmlFormatter(full=False,
# style="native",
noclasses=True))
return "\n".join(code)
class TraceDumper(threading.Thread):
"""Dump stack traces into a given file periodically."""
def __init__(self, fpath, interval, auto):
"""
:param fpath: File path to output HTML (stack trace file)
:param auto: Set flag (True) to update trace continuously.
Clear flag (False) to update only if file not exists.
(Then delete the file to force update.)
:param interval: In seconds: how often to update the trace file.
"""
assert interval > 0.1
self.auto = auto
self.interval = interval
self.fpath = os.path.abspath(fpath)
self.stop_requested = threading.Event()
threading.Thread.__init__(self, name="stackTracer")
def run(self):
while not self.stop_requested.isSet():
time.sleep(self.interval)
if self.auto or not os.path.isfile(self.fpath):
self.dump_stacktraces()
def stop(self):
self.stop_requested.set()
self.join()
try:
if os.path.isfile(self.fpath):
os.unlink(self.fpath)
except: # pylint: disable=bare-except
pass
def dump_stacktraces(self):
with open(self.fpath, "w+", encoding="utf-8") as fout:
fout.write(get_stacktraces())
###############################################################################
_TRACER = None
def trace_start(fpath, interval=5, auto=True):
"""Start tracing into the given file."""
global _TRACER # pylint: disable=global-statement
if _TRACER is None:
_TRACER = TraceDumper(fpath, interval, auto)
_TRACER.setDaemon(True)
_TRACER.start()
else:
raise Exception("Already tracing to %s" % _TRACER.fpath)
def trace_stop():
"""Stop tracing."""
global _TRACER # pylint: disable=global-statement
if _TRACER is None:
raise Exception("Not tracing, cannot stop.")
else:
_TRACER.stop()
_TRACER = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment