Last active
December 13, 2021 14:35
-
-
Save gunchev/02371dbd2579e0d539bc6bc4610f5a18 to your computer and use it in GitHub Desktop.
Python 2 and 3 TimedFunction and TimedMethod plus colored log on terminal (black Linux terminal, konsole/terminator tested)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- encoding: utf-8 -*- | |
"""Timed function and method decorators, portions from the python decorator module | |
Install 'termcolor' (pip install --user termcolor) for colored output. | |
""" | |
from __future__ import absolute_import, print_function, division, unicode_literals | |
import logging | |
import sys | |
import time | |
class Timed(object): | |
'''Timed function decorator''' | |
def __init__(self, min_delay=0.01, warn_delay=0.1, fmt='{:,.3f}'): | |
self._min_delay = min_delay | |
self._warn_delay = warn_delay | |
self._fmt = fmt | |
@staticmethod | |
def _get_signature(func, *args, **kwargs): | |
'''Create function signature''' | |
arr = [repr(i) for i in args] | |
for k, v in kwargs.items(): | |
arr.append(k + '=' + repr(v)) | |
return '%s(%s)' % (func.__name__, ', '.join(arr)) | |
def __call__(self, func): | |
'''Do the decoration itself''' | |
def decorated(*args, **kwargs): | |
log = logging.getLogger("method_timing") | |
if not log.isEnabledFor(logging.ERROR): | |
return func(*args, **kwargs) | |
signature = self._get_signature(func, *args, **kwargs) | |
log.debug("Timing method %s...", signature) | |
now = time.time() | |
try: | |
result = func(*args, **kwargs) | |
except Exception as exc: | |
duration = time.time() - now | |
log.error("Method %s took %s seconds to fail with %r.", signature, self._fmt.format(duration), exc) | |
raise exc | |
duration = time.time() - now | |
level = logging.DEBUG | |
if duration >= self._warn_delay: | |
level = logging.WARN | |
elif duration >= self._min_delay: | |
level = logging.INFO | |
log.log(level, "Method %s took %s seconds.", signature, self._fmt.format(duration)) | |
return result | |
decorated.__name__ = func.__name__ | |
decorated.__doc__ = func.__doc__ | |
decorated.__defaults__ = func.__defaults__ | |
try: | |
decorated.__kwdefaults__ = func.__kwdefaults__ | |
decorated.__annotations__ = func.__annotations__ | |
except AttributeError: | |
pass | |
try: | |
frame = sys._getframe(1) | |
except AttributeError: # for IronPython and similar implementations | |
callermodule = '?' | |
else: | |
callermodule = frame.f_globals.get('__name__', '?') | |
decorated.__module__ = callermodule | |
return decorated | |
class TimedFunction(Timed): | |
'''Timed function decorator (alias)''' | |
class TimedMethod(Timed): | |
'''Timed class method decorator''' | |
@staticmethod | |
def _get_signature(func, *args, **kwargs): | |
'''Create class method signature''' | |
obj = args[0] | |
arr = [repr(i) for i in args[1:]] | |
for k, v in kwargs.items(): | |
arr.append(k + '=' + repr(v)) | |
return '%s.%s(%s)' % (obj.__class__.__name__, func.__name__, ', '.join(arr)) | |
LOGGING_PATCHED_ALREADY = False | |
def monkey_patch_logging_format(): | |
"""Monkey patches the logging module format error report""" | |
def print_log_record_on_error(func): | |
"""Monkeypatch for logging.LogRecord.getMessage | |
Credits: http://stackoverflow.com/questions/2477934/ | |
""" | |
def wrap(self, *args, **kwargs): | |
"""Generate wrapper function for logging.LogRecord.getMessage""" | |
try: | |
return func(self, *args, **kwargs) | |
except Exception as exc: # pylint: disable=broad-except | |
return "Unable to create log message with msg=%r, args=%r: %r" \ | |
% (getattr(self, 'msg', '?'), getattr(self, 'args', '?'), exc) | |
return wrap | |
# Monkeypatch the logging library for more informative formatting errors | |
logging.LogRecord.getMessage = print_log_record_on_error(logging.LogRecord.getMessage) | |
def monkey_patch_logging_color(): | |
"""Monkey patches the logging module to print in color""" | |
from os import isatty | |
if not isatty(sys.stderr.fileno()): | |
return False | |
try: | |
from termcolor import colored # pylint: disable=import-error | |
def get_formatter(logging_formatter=logging.Formatter): | |
"""Get it... ;-)""" | |
class ColoredFormatter(logging_formatter): | |
"""Color console formatter""" | |
def format(self, record): | |
output = logging_formatter.format(self, record) | |
tail = None | |
comment_pos = output.find(' # ') | |
if comment_pos >= 0: | |
output, tail = output[:comment_pos], output[comment_pos:] | |
if record.levelno <= logging.DEBUG: # pylint: disable=fixme | |
output = colored(text=output, color='grey', on_color=None, attrs=['bold']) | |
elif record.levelno <= logging.INFO: | |
output = colored(text=output, color='green', on_color=None, attrs=['bold']) | |
elif record.levelno <= logging.WARNING: | |
output = colored(text=output, color='yellow', on_color=None, attrs=['bold']) | |
elif record.levelno <= logging.ERROR: | |
output = colored(text=output, color='red', on_color=None, attrs=['bold']) | |
elif record.levelno <= logging.CRITICAL: | |
output = colored(text=output, color='white', on_color='on_red', attrs=['bold']) | |
else: | |
output = colored(text=output, color='yellow', on_color='on_red', | |
attrs=['bold', 'blink', 'underline']) | |
if tail: | |
output += colored(text=tail, color='blue', attrs=['dark']) | |
return output | |
def formatException(self, ei): | |
"""Format and return the specified exception information as a string.""" | |
text = logging_formatter.formatException(self, ei) | |
return colored(text=text, color='white', on_color="on_magenta") | |
return ColoredFormatter | |
logging.Formatter = get_formatter() | |
return True | |
except ImportError: | |
return False | |
def monkey_patch_logging(color_on_terminal=True): | |
"""Monkey patches the logging module | |
Install termcolor to get colored logs on the terminal. It's tiny! | |
Returns True on success, False if colored logs were requested but | |
the termcolor package can not be imported. | |
Example: | |
def main(): | |
'''Test color logging on terminal and logging format error''' | |
monkey_patch_logging(color_on_terminal=True) | |
logging.basicConfig(level=logging.INFO, # pylint: disable=fixme | |
format="%(levelname)-8s: %(message)s # %(filename)s:%(lineno)d %(name)s") | |
log = logging.getLogger('nexus') | |
log.debug("Debug message...") | |
log.info("Info message.") | |
log.warning("Warning message.") | |
log.error("Error message.") | |
log.critical("Critical error message!") | |
log._log(51, "MORE THAN CRITICAL! We are all gonna die now! Nooooooo!", []) | |
log.debug("Bad message format", 1, 2, 3) # and we need 0 arguments, but provide 3! | |
try: | |
raise ValueError("Sample value error exception!") | |
except ValueError: | |
log.exception("Exception log...") | |
log.info("Exiting...") | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) | |
""" | |
global LOGGING_PATCHED_ALREADY # pylint: disable=global-statement | |
if LOGGING_PATCHED_ALREADY: | |
return True | |
LOGGING_PATCHED_ALREADY = True | |
monkey_patch_logging_format() | |
if color_on_terminal: | |
return monkey_patch_logging_color() | |
return True | |
@TimedFunction(min_delay=0.1, warn_delay=0.2, fmt="{:,.6f}") | |
def my_sleep(seconds): | |
'''My sleep function''' | |
# print("my_sleep({}) start".format(seconds)) | |
if seconds < 0: | |
raise ValueError("Invalid sleep interval of {} seconds".format(seconds)) | |
time.sleep(seconds) | |
# print("my_sleep({}) stop".format(seconds)) | |
class Sleeper(object): | |
def __init__(self, sleep_time): | |
self._sleep_time = float(sleep_time) | |
if self._sleep_time < 0.0: | |
raise ValueError("Invalid sleep interval: " + repr(sleep_time)) | |
@TimedMethod(min_delay=0.01, warn_delay=0.21, fmt="{:,.6f}") | |
def run(self, sleep_time=None): | |
'''do the sleeping''' | |
if sleep_time is None: | |
sleep_time = self._sleep_time | |
if sleep_time < 0.0: | |
raise ValueError("Invalid sleep interval: " + repr(sleep_time)) | |
time.sleep(sleep_time) | |
def run_test(): | |
my_sleep(0.01) | |
my_sleep(0.1) | |
my_sleep(0.2) | |
sleeper = Sleeper(0.2) | |
sleeper.run() | |
sleeper.run(0.21) | |
def main(): | |
monkey_patch_logging(color_on_terminal=True) | |
try: | |
logging.basicConfig(level=logging.DEBUG, # pylint: disable=fixme | |
format="%(levelname)-8s: %(message)s # %(filename)s:%(lineno)d", # %(name)s | |
disable_existing_loggers=False) | |
except ValueError: | |
logging.basicConfig(level=logging.DEBUG, # pylint: disable=fixme | |
format="%(levelname)-8s: %(message)s # %(filename)s:%(lineno)d") # %(name)s | |
log = logging.getLogger('global') | |
log.fatal("*** Start DEBUG level ***") | |
run_test() | |
log.fatal("*** END ***") | |
print() | |
logging.root.setLevel(logging.INFO) | |
log.fatal("*** Start INFO level") | |
run_test() | |
log.fatal("*** END ***") | |
print() | |
logging.root.setLevel(logging.ERROR) | |
log.fatal("*** Start ERROR level") | |
run_test() | |
try: | |
my_sleep(-0.1) | |
except ValueError as exc: | |
print("Got ValueError as expected: {}".format(exc)) | |
log.fatal("*** END ***") | |
print() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment