Skip to content

Instantly share code, notes, and snippets.

@EstevanTH
Created January 12, 2024 17:23
Show Gist options
  • Save EstevanTH/1240a7744611e02c1080056632c8d85a to your computer and use it in GitHub Desktop.
Save EstevanTH/1240a7744611e02c1080056632c8d85a to your computer and use it in GitHub Desktop.
Traces Web2py requests and logs those that last for longer than the configured threshold
# coding: UTF-8
"""Traces Web2py requests and logs those that last for longer than the configured threshold
The status of the request is displayed in the log output.
This module is very handy to identify stuck request handler threads and bottlenecks.
As of now, it has only been tested on Python 2.7.18.
Setup:
- Place this file in the "site-packages" subdirectory of Web2py.
- Create the file "routes.py" at the root of Web2py, if it does not exist.
- Insert the following lines:
import log_long_lasting_requests
log_long_lasting_requests.start_logging()
Recommended settings.json parameters for Visual Studio Code, due to Python 2 compatibility:
{
"python.analysis.diagnosticSeverityOverrides": {
"reportTypeCommentUsage": "none"
}
}
By Mohamed RACHID
"""
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, print_function, unicode_literals # everything Python 3
from traceback import print_exc
# Parameters:
_EXECUTION_TIME_LOG_THRESHOLD_s = 300. # type: float
_STATUS_LOG_OUTPUT_INTERVAL_s = 180. # type: float # should be lower than _EXECUTION_TIME_LOG_THRESHOLD_s
import datetime
import gluon.main
import threading
from functools import wraps
from os.path import normpath
from time import sleep
try:
from threading import get_ident # type: ignore
except ImportError:
from thread import get_ident # type: ignore # Python 2
class OngoingExecution(object):
thread_id = -1
started_at = datetime.datetime.min
method = ""
url = "" # full URI
step = "before models"
_ongoing_executions = {} # type: dict[int, OngoingExecution | None] # thread_id -> OngoingExecution
#_finished_long_executions = [] # TODO
_log_file_path = normpath(__file__ + "/../../logs/log_long_lasting_requests.log")
def start_tracing(): # type: () -> bool # type: ignore
try:
str(gluon.main.wsgibase)
except AttributeError:
# Module not fully initialized yet (first call):
return False
_gluon_main_wsgibase_original = gluon.main.wsgibase
@wraps(_gluon_main_wsgibase_original)
def _gluon_main_wsgibase(environ, responder): # type: (dict[str, object], object) -> None
thread_id = get_ident() # type: int
execution_info = OngoingExecution()
try:
execution_info.thread_id = thread_id
execution_info.started_at = datetime.datetime.now()
execution_info.method = environ["REQUEST_METHOD"] # type: ignore
execution_info.url = environ["HTTP_HOST"] + environ["PATH_INFO"] + "?" + environ["QUERY_STRING"] # type: ignore
_ongoing_executions[thread_id] = execution_info
except Exception:
print_exc()
try:
return _gluon_main_wsgibase_original(environ, responder)
finally:
try:
_ongoing_executions[thread_id] = None
except Exception:
print_exc()
gluon.main.wsgibase = _gluon_main_wsgibase
_gluon_main_run_models_in_original = gluon.main.run_models_in
@wraps(_gluon_main_run_models_in_original)
def _gluon_main_run_models_in(environment): # type: (dict[str, object]) -> None
execution_info = None # type: OngoingExecution | None
try:
execution_info = _ongoing_executions[get_ident()]
if execution_info is not None:
execution_info.step = "executing models"
except Exception:
print_exc()
try:
return _gluon_main_run_models_in_original(environment)
finally:
try:
if execution_info is not None:
execution_info.step = "after models"
except Exception:
print_exc()
gluon.main.run_models_in = _gluon_main_run_models_in
_gluon_main_run_controller_in_original = gluon.main.run_controller_in
@wraps(_gluon_main_run_controller_in_original)
def _gluon_main_run_controller_in(controller, function, environment): # type: (str, str, dict[str, object]) -> str
execution_info = None # type: OngoingExecution | None
try:
execution_info = _ongoing_executions[get_ident()]
if execution_info is not None:
execution_info.step = "executing controller"
except Exception:
print_exc()
try:
return _gluon_main_run_controller_in_original(controller, function, environment)
finally:
try:
if execution_info is not None:
execution_info.step = "after controller"
except Exception:
print_exc()
gluon.main.run_controller_in = _gluon_main_run_controller_in
_gluon_main_run_view_in_original = gluon.main.run_view_in
@wraps(_gluon_main_run_view_in_original)
def _gluon_main_run_view_in(environment): # type: (dict[str, object]) -> None
execution_info = None # type: OngoingExecution | None
try:
execution_info = _ongoing_executions[get_ident()]
if execution_info is not None:
execution_info.step = "executing view"
except Exception:
print_exc()
try:
return _gluon_main_run_view_in_original(environment)
finally:
try:
if execution_info is not None:
execution_info.step = "after view"
except Exception:
print_exc()
gluon.main.run_view_in = _gluon_main_run_view_in
print("Tracing request executions!")
global start_tracing
@wraps(start_tracing)
def start_tracing(): # type: () -> bool # type: ignore
# Once activated, do nothing on next calls.
return True
return True
def do_logging(): # type: () -> None
"""Logging thread execution"""
print("Logging requests executions that last for more than %f seconds!" % _EXECUTION_TIME_LOG_THRESHOLD_s)
while True:
sleep(_STATUS_LOG_OUTPUT_INTERVAL_s)
now = datetime.datetime.now()
long_executions = [
execution
for _, execution in sorted(_ongoing_executions.items())
if execution is not None \
and (now - execution.started_at).total_seconds() > _EXECUTION_TIME_LOG_THRESHOLD_s
] # type: list[OngoingExecution]
if long_executions:
try:
log_lines = [
now.isoformat() + " - Ongoing long request executions:",
] # type: list[str] | None
log_lines += [
" %s %4s %s [%s][%.2f s][%u]" % (
execution.started_at.isoformat(),
execution.method,
execution.url,
execution.step,
(now - execution.started_at).total_seconds(),
execution.thread_id,
)
for execution in long_executions
]
log_lines += [
"",
]
log_lines_merged = "\n".join(log_lines) # type: str
print(log_lines_merged)
# Écriture en dernier pour tolérer échec :
with open(_log_file_path, "at") as log_output:
log_output.write(log_lines_merged)
except Exception:
print_exc()
finally:
log_lines = None
_thread_logging = threading.Thread(
target=do_logging,
name=__name__ + ".do_logging()",
)
_thread_logging.daemon = True
def start_logging(): # type: () -> None
if start_tracing():
if _thread_logging.ident is None:
_thread_logging.start()
def get_ongoing_executions(): # type: () -> dict[int, OngoingExecution | None]
return dict(_ongoing_executions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment