Last active
August 24, 2019 21:50
-
-
Save komuw/7db3eeabda830c5868d7248960692f28 to your computer and use it in GitHub Desktop.
create a breach handler for python's standard logging library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import collections | |
# Inspired by; https://tersesystems.com/blog/2019/07/28/triggering-diagnostic-logging-on-exception/ | |
# Also see; https://docs.python.org/3.6/library/logging.handlers.html#logging.handlers.MemoryHandler | |
# Also see; https://github.com/komuw/naz/blob/e0666550396400f86b9a547932bb9075c520a5d9/naz/log.py#L159-L230 | |
class BreachHandler(logging.StreamHandler): | |
""" | |
Log handler that buffers logs in an in-memory ring buffer until a trigger. | |
When a trigger condition(eg a certain log level) is met; | |
- then all the logs in the buffer are flushed into a given stream(file, stdout etc) | |
""" | |
def __init__(self, trigger_level=logging.WARNING, buffer_size=10_000, stream=None): | |
# call `logging.StreamHandler` init | |
super(BreachHandler, self).__init__(stream=stream) | |
self.trigger_level = trigger_level | |
self.buffer_size = buffer_size | |
self.buffered_logs = collections.deque(maxlen=self.buffer_size) | |
self._action_triggered = False | |
def emit(self, record): | |
""" | |
Implementation is mostly taken from `logging.StreamHandler` | |
""" | |
try: | |
# 1. append new item to deque(ring-buffer) | |
msg = self.format(record) | |
self.buffered_logs.append(msg) | |
# 2. check if the loglevel of the current record >= `self.trigger_level` | |
# if it is. | |
# (a) acquire lock | |
# (b) stream.write and flush from `self.buffered_logs` | |
# (c) clear out `self.buffered_logs` | |
if record.levelno < self.trigger_level: | |
return | |
else: | |
stream = self.stream | |
for _msg in self.buffered_logs: | |
stream.write(_msg) | |
stream.write(self.terminator) | |
self.buffered_logs.clear() | |
self.flush() | |
except Exception: | |
self.handleError(record) | |
# Usage: | |
logger = logging.getLogger("my-fingers-logger") | |
handler = BreachHandler(buffer_size=10) | |
formatter = logging.Formatter("%(message)s") | |
handler.setFormatter(formatter) | |
handler.setLevel("DEBUG") | |
if not logger.handlers: | |
logger.addHandler(handler) | |
logger.setLevel("DEBUG") | |
def biz_logic(): | |
trace_id = "adqeT78" | |
try: | |
for i in range(0, 10): | |
# log something | |
logger.info({"trace_id": trace_id, "id": i}) | |
print("no log has appeared in output so far") | |
time.sleep(5) # simulate passage of time | |
x = {} | |
name = x["name"] # produce error | |
except KeyError as e: | |
logger.error( | |
{"trace_id": trace_id, "error_msg": str(e), "error_type": e.__class__.__name__} | |
) | |
print("this produced log of the error & all other previous logs") | |
finally: | |
logger.info({"trace_id": trace_id, "state": "end"}) | |
print("no other log output") | |
# perform biz logic | |
biz_logic() |
Author
komuw
commented
Aug 2, 2019
An interesting problem to solve(or maybe not) is;
- if a request is handled by service A, and then service B, C & D
- however, that request only experiences an error while it is in service D
- then, only the logs in service D will get flushed/persisted.
- however, it would be cool if all the breachHandlers in all the services could co-ordinate and flush together(if anyone of them has met the breach threshold)
- this is some distributed-fu stuff that you probably want to leave alone and not try and solve it
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment