Last active
September 8, 2021 08:36
-
-
Save jponf/cb8efe2e5ae6da86ee47f200cc13b7d5 to your computer and use it in GitHub Desktop.
Python stacktracer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Stack tracer for multi-threaded applications. | |
This code is a copy of https://pypi.python.org/pypi/stacktracer/0.1.2 | |
with minor modifications. | |
Usage: | |
import stacktracer | |
stacktracer.trace_start("trace.html", interval=5, auto=True) | |
# Set auto flag to always update file! | |
.... | |
stacktracer.trace_stop() | |
""" | |
# pylint: disable=protected-access | |
import os | |
import sys | |
import time | |
import threading | |
import traceback | |
try: | |
from pygments import highlight | |
from pygments.lexers import PythonLexer | |
from pygments.formatters import HtmlFormatter | |
HAS_PYGMENTS = True | |
except ImportError: | |
HAS_PYGMENTS = False | |
def get_stacktraces(): | |
"""Get the stack traces from each thread.""" | |
id_to_name = {} | |
for thread in threading.enumerate(): | |
id_to_name[thread.ident] = thread.name | |
code = [] | |
for thread_id, stack in sys._current_frames().items(): | |
code.append("\n# Thread {}, ID {}" | |
.format(id_to_name[thread_id], thread_id)) | |
for filename, lineno, name, line in traceback.extract_stack(stack): | |
code.append('File: "{}", line {}, in {}' | |
.format(filename, lineno, name)) | |
if line: | |
code.append(" %s" % (line.strip())) | |
if HAS_PYGMENTS: | |
return highlight("\n".join(code), | |
PythonLexer(), | |
HtmlFormatter(full=False, | |
# style="native", | |
noclasses=True)) | |
return "\n".join(code) | |
class TraceDumper(threading.Thread): | |
"""Dump stack traces into a given file periodically.""" | |
def __init__(self, fpath, interval, auto): | |
""" | |
:param fpath: File path to output HTML (stack trace file) | |
:param auto: Set flag (True) to update trace continuously. | |
Clear flag (False) to update only if file not exists. | |
(Then delete the file to force update.) | |
:param interval: In seconds: how often to update the trace file. | |
""" | |
assert interval > 0.1 | |
self.auto = auto | |
self.interval = interval | |
self.fpath = os.path.abspath(fpath) | |
self.stop_requested = threading.Event() | |
threading.Thread.__init__(self, name="stackTracer") | |
def run(self): | |
while not self.stop_requested.isSet(): | |
time.sleep(self.interval) | |
if self.auto or not os.path.isfile(self.fpath): | |
self.dump_stacktraces() | |
def stop(self): | |
self.stop_requested.set() | |
self.join() | |
try: | |
if os.path.isfile(self.fpath): | |
os.unlink(self.fpath) | |
except: # pylint: disable=bare-except | |
pass | |
def dump_stacktraces(self): | |
with open(self.fpath, "w+", encoding="utf-8") as fout: | |
fout.write(get_stacktraces()) | |
############################################################################### | |
_TRACER = None | |
def trace_start(fpath, interval=5, auto=True): | |
"""Start tracing into the given file.""" | |
global _TRACER # pylint: disable=global-statement | |
if _TRACER is None: | |
_TRACER = TraceDumper(fpath, interval, auto) | |
_TRACER.setDaemon(True) | |
_TRACER.start() | |
else: | |
raise Exception("Already tracing to %s" % _TRACER.fpath) | |
def trace_stop(): | |
"""Stop tracing.""" | |
global _TRACER # pylint: disable=global-statement | |
if _TRACER is None: | |
raise Exception("Not tracing, cannot stop.") | |
else: | |
_TRACER.stop() | |
_TRACER = None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment