Created
October 5, 2015 19:37
-
-
Save anthonyrisinger/efbbfe11472feb9da9da to your computer and use it in GitHub Desktop.
logbook/logstash formatter and stackable context dicts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf8 | |
from __future__ import absolute_import | |
from __future__ import print_function | |
import datetime | |
import itertools | |
import json | |
import sys | |
import threading | |
import logbook | |
from logbook.helpers import cached_property | |
# TODO: cffi lib to get linux thread_id via syscall | |
class ThreadsafeCounter(object): | |
"""Threadsafe counter that might reset on overflow""" | |
def __init__(self, start=0, stop=None, step=1, ring=False): | |
self.start = start | |
self.stop = stop | |
self.step = step | |
self.ring = ring | |
self.lock = threading.Lock() | |
def __iter__(self): | |
return self | |
def __next__(self): | |
with self.lock: | |
count = next(self.counter) | |
if self.stop is not None and count >= self.stop: | |
if not self.ring: | |
raise StopIteration | |
del self.counter | |
count = next(self.counter) | |
return count | |
def __call__(self): | |
try: | |
return self.__next__() | |
except StopIteration: | |
return None | |
def next(self): | |
return self.__next__() | |
@cached_property | |
def counter(self): | |
return itertools.count(self.start, self.step) | |
class LogstashFormatter(object): | |
"""Logbook -> Logstash""" | |
# process-wide sequence number | |
seq = ThreadsafeCounter(start=0, stop=1000, step=1, ring=True) | |
def __call__(self, record, handler): | |
# get seq immediately to better highight problems | |
seq = next(self.seq) | |
clean = record.to_dict(json_safe=True) | |
clean['seq'] = seq | |
# message = msg.format(*args, **kwargs) | |
clean.pop('msg', None) | |
clean.pop('args', None) | |
clean.pop('kwargs', None) | |
if 'message' in clean: | |
# if it doesn't look like a string, assume update-compatible | |
# logger.info({key: value}) | |
if not hasattr(clean['message'], 'format'): | |
clean.update(clean.pop('message')) | |
# (donotwant) | |
clean.pop('frame_correction', None) | |
clean.pop('heavy_initialized', None) | |
clean.pop('information_pulled', None) | |
if clean.get('greenlet') == clean.get('thread'): | |
clean.pop('greenlet', None) | |
for key, value in clean.items(): | |
if not value: | |
# ruh roh! only allow numericals through (includes bools) | |
try: | |
value + 0 | |
except TypeError: | |
clean.pop(key) | |
# last chance before ship | |
if 'time' in clean: | |
if '@timestamp' not in clean: | |
clean['@timestamp'] = clean['time'] | |
clean.pop('time') | |
if '@version' not in clean: | |
clean['@version'] = 1 | |
if '@timestamp' not in clean: | |
clean['@timestamp'] = datetime.datetime.utcnow().isoformat() + 'Z' | |
if 'level' in clean: | |
clean['level_name'] = logbook.get_level_name(clean['level']) | |
# TODO: why? | |
# if 'message' not in clean: | |
# clean['message'] = '' | |
# commit! | |
clean_str = json.dumps(clean, sort_keys=True, separators=(',', ':')) | |
return clean_str | |
class AtomProcessor(logbook.Processor): | |
def process(self, record): | |
for key, value in self.namespace.iteritems(): | |
record.__dict__.setdefault(key, value) | |
@cached_property | |
def namespace(self): | |
return {} | |
def __len__(self): | |
return len(self.namespace) | |
def __iter__(self): | |
return iter(self.namespace) | |
def __getitem__(self, key): | |
return self.namespace[key] | |
def __setitem__(self, key, value): | |
self.namespace[key] = value | |
def __delitem__(self, key): | |
del self.namespace[key] | |
class Logger(logbook.Logger): | |
@property | |
def atom(self): | |
return AtomProcessor() | |
logger = Logger(__name__) | |
handler = logbook.StreamHandler(sys.stdout) | |
handler.formatter = LogstashFormatter() | |
handler.push_application() | |
if __name__ == '__main__': | |
logger.info('== ONE ==') | |
with logger.atom.threadbound() as atom: | |
logger.info('== TWO ==') | |
atom['ZZZeeeeeee'] = 'okaaaaaaaay' | |
atom['HA_HA_HA_HA'] = 12341234 | |
with logger.atom as atom2: | |
atom2['hitherererererer'] = ':)' | |
logger.info('== THREE ({wow} {}) ==', 'way', wow='mmk') | |
logger.info({'q_______g':1.1}) | |
del atom2['hitherererererer'] | |
logger.info('== THREE.3 ==') | |
atom2['HA_HA_HA_HA'] = 'RUH ROH!!!!!!!!!' | |
logger.info('== FOUR ==') | |
logger.info('== FIVE ==') | |
atom['HA_HA_HA_HA'] = 0 | |
logger.info('== SIX ==') | |
logger.info('== SEVEN ==') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment