Skip to content

Instantly share code, notes, and snippets.

@davesteele
Last active January 5, 2020 20:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davesteele/65f8c581fb99b3f1fa88963dc0786c97 to your computer and use it in GitHub Desktop.
Save davesteele/65f8c581fb99b3f1fa88963dc0786c97 to your computer and use it in GitHub Desktop.
Capture Python exception text for future logging
#!/usr/bin/python
import io
import traceback
try:
1.0/0.0
except ZeroDivisionError as e:
with io.StringIO() as fp:
traceback.print_exc(file=fp)
exc_text = fp.getvalue()
print(exc_text)
#!/usr/bin/python3
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
import gzip
import logging
from logging.handlers import TimedRotatingFileHandler
import os
from queue import Queue, Empty, Full
from threading import Lock
import io
import traceback
__all__ = ["debug", "info", "warning", "error", "critical"]
LOG_PATH = "/home/daves/foo/foo.log"
Entry = namedtuple("Entry", ["level", "msg", "args", "kwargs"])
executor = ThreadPoolExecutor()
loglock: Lock = Lock()
logq: Queue = Queue(1000)
class TankLogger(logging.Logger):
"""A nonblocking Logger class"""
def _log(self, level, msg, args, **kwargs):
self._logentry(level, msg, args, **kwargs)
def parent_log(self, level, msg, args, **kwargs):
super()._log(level, msg, args, **kwargs)
def _logentry(self, level, msg, args, **kwargs):
def _flush_log():
if loglock.acquire(False):
try:
while True:
entry = logq.get(False)
log.parent_log(
entry.level, entry.msg, entry.args, **entry.kwargs
)
except Empty:
return
finally:
loglock.release()
try:
logq.put(Entry(level, msg, args, kwargs), block=False)
except Full:
pass
executor.submit(_flush_log)
def rotator(source: str, dest: str) -> None:
"""Define log rotation action."""
with open(source, "rb") as infp, gzip.open(dest, "wb") as outfp:
outfp.writelines(infp)
os.remove(source)
def namer(name: str) -> str:
"""Modify archived log names."""
return name + ".gz"
def deflog() -> logging.Logger:
"""Define the system logger."""
logging.setLoggerClass(TankLogger)
fmtr = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler = TimedRotatingFileHandler(
LOG_PATH, encoding="utf=8", when="d", interval=1, backupCount=8,
)
handler.setFormatter(fmtr)
handler.rotator = rotator
handler.namer = namer
log = logging.getLogger("tank")
log.setLevel(logging.INFO)
log.addHandler(handler)
return log
log = deflog()
def debug(msg: str, *args, **kwargs) -> None:
log.debug(msg, *args, **kwargs)
def info(msg: str, *args, **kwargs) -> None:
log.info(msg, *args, **kwargs)
def warning(msg: str, *args, **kwargs) -> None:
log.warning(msg, *args, **kwargs)
def error(msg: str, *args, **kwargs) -> None:
log.error(msg, *args, **kwargs)
def critical(msg: str, *args, **kwargs) -> None:
log.critical(msg, *args, **kwargs)
def exception(e: Exception):
ename = e.__class__.__name__
error("Exception: {}".format(ename))
with io.StringIO() as fp:
traceback.print_tb(e.__traceback__, file=fp)
tb_text = fp.getvalue().strip()
error("Traceback")
for line in tb_text.split("\n"):
error(line)
if __name__ == "__main__":
info("FOO")
try:
1.0/0.0
except ZeroDivisionError as e:
exception(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment