Skip to content

Instantly share code, notes, and snippets.

@vsajip
Created April 29, 2012 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vsajip/2549044 to your computer and use it in GitHub Desktop.
Save vsajip/2549044 to your computer and use it in GitHub Desktop.
Introspection of the logging hierarchy
#
# Copyright (C) 2012 Vinay Sajip. Licensed under the MIT license.
#
from collections import namedtuple
import logging
import re
class Snapper(object):
DEFAULT_CONFIG = {
'attrmap': {
'TextIOWrapper': 'name mode encoding',
'file': 'name mode encoding',
'logging.PlaceHolder': 'children',
'logging.Formatter': 'datefmt _fmt:format _style:style',
'logging.Filter': 'name',
'logging.RootLogger': 'filters handlers level propagate children',
'logging.Logger': 'disabled filters handlers level name '
'propagate children',
'logging.Handler': 'filters formatter level',
'logging.StreamHandler': 'filters formatter level stream',
'logging.FileHandler': 'filters formatter level '
'baseFilename:filename mode encoding delay',
'logging.handlers.WatchedFileHandler': 'filters formatter level '
'baseFilename:filename mode encoding delay',
'logging.handlers.RotatingFileHandler': 'filters formatter level '
'baseFilename:filename mode encoding maxBytes backupCount '
'delay',
'logging.handlers.TimedRotatingFileHandler': 'filters formatter '
'level baseFilename:filename mode encoding when interval '
'backupCount delay utc',
'logging.handlers.SocketHandler': 'filters formatter level host '
'port closeOnError retryStart retryMax retryFactor',
'logging.handlers.DatagramHandler': 'filters formatter level host '
'port closeOnError',
'logging.handlers.SysLogHandler': 'filters formatter level address '
'facility socktype',
'logging.handlers.SMTPHandler': 'filters formatter level mailhost '
'mailport fromaddr toaddrs username password subject secure '
'timeout',
'logging.handlers.NTEventLogHandler': 'filters formatter level '
'appname dllname logtype',
'logging.handlers.HTTPHandler': 'filters formatter level host '
'url method secure credentials',
'logging.handlers.BufferingHandler': 'filters formatter level '
'capacity',
'logging.handlers.MemoryHandler': 'filters formatter level '
'capacity flushLevel target',
'logging.handlers.QueueHandler': 'filters formatter level',
},
'format': {
'logging.PercentStyle': lambda x : '%',
'logging.StrFormatStyle': lambda x : '{',
'logging.StringTemplateStyle': lambda x : '$',
},
}
LEADING_UNDERSCORE = re.compile('^_')
class Context(object):
def __init__(self, config):
self.converted = {}
self.typemap = dict(config.get('typemap', {}))
self.attrmap = attrmap = dict(config.get('attrmap', {}))
self.format = dict(config.get('format', {}))
for k in attrmap:
v = attrmap[k]
if not isinstance(v, dict):
d = {}
attrs = v.split()
for attr in attrs:
if ':' in attr:
attr = attr.split(':', 1)
d[attr[0]] = attr[1]
else:
d[attr] = attr
attrmap[k] = d
def __init__(self, config=None):
self.config = config or self.DEFAULT_CONFIG
def snapshot(self):
convert = self.convert
ctx = self.Context(self.config)
logging._acquireLock()
self.root = logging.Logger.manager.root
ld = dict(logging.Logger.manager.loggerDict)
self.logger_items = sorted(ld.items())
try:
result = { '': convert(ctx, self.root) }
for k, v in ld.items():
result[k] = convert(ctx, v)
finally:
logging._releaseLock()
return result
def get_children(self, logger_or_placeholder):
if logger_or_placeholder is self.root:
result = [(k, v) for k, v in self.logger_items if '.' not in k]
else:
index = -1
for i, item in enumerate(self.logger_items):
k, v = item
if v is logger_or_placeholder:
index = i
name = k
nlen = len(k)
break
result = []
if index >= 0:
for k, v in self.logger_items[index + 1:]:
if (k.startswith(name) and k[nlen:nlen + 1] == '.' and
'.' not in k[nlen+2:]):
result.append((k, v))
return result
def convert(self, context, value):
cnv = self.convert
if isinstance(value, dict):
result = dict([cnv(context, x) for x in value.items()])
elif isinstance(value, (list, tuple)):
result = tuple([cnv(context, x) for x in value])
elif value is None or isinstance(value, (int, float, str)):
result = value
else:
if not hasattr(value, '__module__'):
key = value.__class__.__name__
else:
key = '%s.%s' % (value.__module__, value.__class__.__name__)
if key in context.format:
result = context.format[key](value)
elif value in context.converted:
result = context.converted[value]
else:
if key in context.attrmap:
attrs = context.attrmap[key]
elif (isinstance(value, logging.Handler) and
'logging.Handler' in context.attrmap):
attrs = context.attrmap['logging.Handler']
elif (isinstance(value, logging.Filter) and
'logging.Filter' in context.attrmap):
attrs = context.attrmap['logging.Filter']
elif (isinstance(value, logging.Logger) and
'logging.Logger' in context.attrmap):
attrs = context.attrmap['logging.Logger']
elif (isinstance(value, logging.Formatter) and
'logging.Formatter' in context.attrmap):
attrs = context.attrmap['logging.Formatter']
else:
attrs = {}
for k in value.__dict__:
attrs[k] = self.LEADING_UNDERSCORE.sub('', k)
context.attrmap[key] = attrs
if key in context.typemap:
cls = context.typemap[key]
else:
fields = ' '.join(attrs.values())
cn = '%sInfo' % key.replace('.', '_')
prefix = 'class_ '
cls = namedtuple(cn, prefix + fields)
context.typemap[key] = cls
d = {}
did_children = False
if isinstance(value, (logging.Logger, logging.PlaceHolder)):
d['children'] = cnv(context, self.get_children(value))
did_children = True
for k, v in attrs.items():
if k == 'children' and did_children:
continue
d[v] = cnv(context, getattr(value, k, None))
result = cls(class_=key, **d)
context.converted[value] = result
return result
def get_snapshot(config=None):
return Snapper(config).snapshot()
#
# Copyright (C) 2012 Vinay Sajip. Licensed under the MIT license.
#
import logging
from logging.config import dictConfig
from logging_tree.format import build_description
from logtree import get_snapshot
import sys
logger = logging.getLogger(__name__)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'special': {
'()': 'logging.Filter',
'name': 'foobar',
}
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'logging.NullHandler',
},
'console':{
'level':'DEBUG',
'class':'logging.StreamHandler',
'formatter': 'simple'
},
'mail_admins': {
'level': 'ERROR',
'class': 'logging.handlers.SMTPHandler',
'mailhost': 'mymailhost',
'fromaddr': 'me@example.com',
'toaddrs': ['you@example.com'],
'subject': 'Pay attention to this',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers':['null'],
'propagate': True,
'level':'INFO',
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom.subproject': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
def describe_filter(f):
if f.class_ == 'logging.Filter':
return 'name=%r' % f.name
return repr(f)
handler_formats = { # Someday we will switch to .format() when Py2.6 is gone.
'logging.StreamHandler': 'Stream %(stream)r',
'logging.FileHandler': 'File %(filename)r',
'logging.handlers.RotatingFileHandler': 'RotatingFile %(filename)r'
' maxBytes=%(maxBytes)r backupCount=%(backupCount)r',
'logging.handlers.SocketHandler': 'Socket %(host)s %(port)r',
'logging.handlers.DatagramHandler': 'Datagram %(host)s %(port)r',
'logging.handlers.SysLogHandler': 'SysLog %(address)r facility=%(facility)r',
'logging.handlers.SMTPHandler': 'SMTP via %(mailhost)s to %(toaddrs)s',
'logging.handlers.HTTPHandler': 'HTTP %(method)s to http://%(host)s/%(url)s',
'logging.handlers.BufferingHandler': 'Buffering capacity=%(capacity)r',
'logging.handlers.MemoryHandler': 'Memory capacity=%(capacity)r dumping to:',
}
if sys.version_info >= (2, 5): handler_formats.update({
'logging.handlers.TimedRotatingFileHandler':
'TimedRotatingFile %(filename)r when=%(when)r'
' interval=%(interval)r backupCount=%(backupCount)r',
})
if sys.version_info >= (2, 6): handler_formats.update({
'logging.handlers.WatchedFileHandler': 'WatchedFile %(filename)r',
})
def describe_handler(h):
result = []
t = h.class_
format = handler_formats.get(h.class_)
if format is None:
result.append(repr(h))
else:
result.append(format % h._asdict())
for f in getattr(h, 'filters', ()):
result.append(' Filter %s' % describe_filter(f))
if t == 'logging.handlers.MemoryHandler' and h.target is not None:
g = describe_handler(h.target)
result.append(' Handler %s' % g[0])
for line in g[1:]:
result.append(' ' + line)
return result
def hierarchy_level(name):
if name == '':
result = 0
else:
result = len(name.split('.'))
return result
def describe(name, logger):
result = []
is_placeholder = logger.class_ == 'logging.PlaceHolder'
if is_placeholder or logger.propagate:
arrow = '<--'
else:
arrow = ' '
if is_placeholder:
name = '[%s]' % name
else:
name = '"%s"' % name
result.append(arrow + name)
if not is_placeholder:
if logger.level:
result.append(' Level ' + logging.getLevelName(logger.level))
if not logger.propagate:
result.append(' Propagate OFF')
for f in getattr(logger, 'filters', ()):
result.append(' Filter %s' % describe_filter(f))
for h in getattr(logger, 'handlers', ()):
g = describe_handler(h)
result.append(' Handler %s' % g[0])
for line in g[1:]:
result.append(' ' + line)
children = logger.children
if children:
last_child = children[-1]
for child in children:
name, logger = child
g = describe(name, logger)
result.append(' |')
result.append(' o' + g[0])
if child is last_child:
prefix = ' '
else:
prefix = ' |'
for line in g[1:]:
result.append(prefix + line)
return result
def format_like_logging_tree(d):
result = describe('', d[''])
return '\n'.join(result)
def main():
try:
dictConfig(LOGGING)
s1 = build_description()[:-1]
s2 = format_like_logging_tree(get_snapshot())
with open('ltree1.txt', 'w') as f:
f.write(s1)
with open('ltree2.txt', 'w') as f:
f.write(s2)
if s1 != s2:
lines1 = s1.splitlines()
lines2 = s2.splitlines()
for lineno, t in enumerate(zip(lines1, lines2)):
line1, line2 = t
if line1 == line2:
print(line1.rstrip())
continue
print('Mismatch at line %d:\nExpected: %r\n'
'Got : %r\n' % (lineno + 1, line1, line2))
break
except Exception as e:
logger.exception('Unable to do comparison')
print('Unable to do comparison: %s' % e)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, filename='logtree.log',
format='%(name)s %(funcName)s %(message)s')
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment