Skip to content

Instantly share code, notes, and snippets.

@stef
Created July 30, 2011 13:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stef/1115526 to your computer and use it in GitHub Desktop.
Save stef/1115526 to your computer and use it in GitHub Desktop.
samples a list of data-structures and displays a statistical representation of the underlying data schema, also can compare 2 composite datastructures... somewhat
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of composite data analysis tools (cdat)
# composite data analysis tools (cdat) is free software: you can
# redistribute it and/or modify it under the terms of the GNU
# Affero General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option)
# any later version.
# composite data analysis tools (cdat) is distributed in the hope
# that it will be useful, but WITHOUT ANY WARRANTY; without even
# the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
# You should have received a copy of the GNU Affero General Public
# License along with composite data analysis tools (cdat) If not,
# see <http://www.gnu.org/licenses/>.
# (C) 2011 by Stefan Marsiske, <stefan.marsiske@gmail.com>
from itertools import izip_longest
from operator import itemgetter
def dump_schema(items,count=None, skip=[],title=None, format="text"):
"""
Dump schema: takes a list of data structures and computes a
probabalistic schema out of the samples, it prints out the result
to the output.
@param count is optional and in case your items list is some kind of cursor that has no __len__
@param skip is an optional list of keys to skip on the top structure
@param title is the name for the data structure to be displayed
@param format <text|full-html|html> - html is default - full html adds a js/css header and a legend
"""
if count==None:
count=len(items)
ax={}
for item in items:
ax=scan(dict([(k,v) for k,v in item.items() if k not in skip]),ax)
if title:
ax['name']=title
if format=='text':
print_schema(ax,0,count)
return
elif format=='full-html':
print '%s<div class="schema">%s</div>' % (_html_header(),'\n'.join([str(x) for x in html_schema(ax,0,count)]))
else:
print '<div class="schema">%s</div>' % '\n'.join([str(x) for x in html_schema(ax,0,count)])
def scan(d, node):
""" helper for dump_schema"""
if not 'types' in node:
node['types']={}
if 'keys' in dir(d):
for k, v in d.items():
if not 'items' in node:
node['items']={}
if not k in node['items']:
node['items'][k]={'name':k}
node['items'][k]=scan(v,node['items'][k])
elif '__iter__' in dir(d):
if not 'elems' in node:
node['elems']={}
for v in d:
node['elems'][str(type(v))]=scan(v,node['elems'].get(str(type(v)),{}))
tmp=node['types'].get(str(type(d)),{'count': 0, 'example': d})
tmp['count']+=1
if d and not tmp['example']:
tmp['example']=d
node['types'][str(type(d))]=tmp
return node
def print_schema(node,indent,parent):
""" helper for dump_schema"""
for k,v in sorted(node['types'].items(),key=lambda x: x[1]['count'],reverse=True):
print "{0:>3}".format(int(float(v['count'])/parent*100)), ' '*indent, node.get('name',''), k,
if k=="<type 'list'>":
print ''
for x in node['elems'].values():
print_schema(x,indent+1,v['count'])
elif k=="<type 'dict'>":
print ''
for x in node['items'].values():
print_schema(x,indent+1,v['count'])
elif k=="<type 'unicode'>":
print v['example'].encode('utf8')
else:
print v['example']
schematpl="<dl style='background-color: #{4:02x}{4:02x}{4:02x};'><dt>{1}</dt><dd> <span class='type'>{2}</span> <span class='p'>({0}%)</span><div class='{5}'>{3}</div></dd></dl>"
def html_schema(node,indent,parent):
""" helper for dump_schema"""
res=[]
for k,v in sorted(node['types'].items(),key=lambda x: x[1]['count'],reverse=True):
if k=="<type 'list'>":
type='List'
data="<ul>{0}</ul>".format(''.join(["<li>{0}</li>".format(y) for x in node['elems'].values() for y in html_schema(x,indent+1,v['count'])]))
clss='contents'
elif k=="<type 'dict'>":
type='Dict'
data="<ul>{0}</ul>".format(''.join(["<li>{0}</li>".format(y) for x in node['items'].values() for y in html_schema(x,indent+1,v['count'])]))
clss='contents'
elif k=="<type 'unicode'>":
type='unicode'
data="Example: {0}".format(v['example'].encode('utf8'))
clss='example'
else:
type=k[7:-2]
data="Example: {0}".format(v['example'])
clss= 'example'
res.append(schematpl.format(int(float(v['count'])/parent*100),
node.get('name','[0]'),
type,
data,
256-int(64*(1 if v['count']>=parent else float(v['count'])/parent)),
clss,
))
return res
def _html_header():
""" helper for html_schema"""
return """
<style>
dt { display: inline; cursor: pointer; color: #288; }
dd { display: inline; margin-left: 0;}
dl { margin-top: .4em; }
ul { list-style: none; }
.contents, .example { margin-left: 2em; background-color: white}
.type { font-style: italic }
.p { font-size: .8em }
.schema-legend { font-size: .8em; font-style: italic; }
</style>
<script type="text/javascript" src="http://code.jquery.com/jquery-1.6.2.js"> </script>
<script type="text/javascript">
$(document).ready(function() {
$('div.contents').hide();
$('.schema > dl > dd > div.contents').show();
$('dt').click(function() {
$(this).parent().find('div.contents:first').toggle();
});
});
</script>
<div class="schema-legend">Click on the names to fold/expand levels. Percentages show probability of this field appearing under it's parent. In case of lists, percentage also shows average length of list.</div>
"""
def diff(old, new, path=[]):
"""a handy comparison function for composite data structures"""
if old==None and new!=None:
return [{'type': 'added', 'data': new, 'path': path}]
elif new==None and old!=None:
return [{'type': 'deleted', 'data': old, 'path': path}]
if type(old) == str: old=unicode(old,'utf8')
if type(new) == str: new=unicode(new,'utf8')
if not type(old)==type(new):
return [{'type': 'changed', 'data': (old, new), 'path': path}]
elif hasattr(old,'keys'):
res=[]
for k in set(old.keys() + (new or {}).keys()):
r=diff(old.get(k),(new or {}).get(k), path+[k])
if r:
res.extend(r)
return res
elif hasattr(old,'__iter__'):
res=[]
for item in filter(None,[diff(a,b,path+[(len(old) if len(old)<len(new) else len(new))-i]) for i,(a,b) in enumerate(izip_longest(reversed(old),reversed(new)))]):
if type(item)==type(list()):
res.extend(item)
else:
res.append(item)
return res
elif old != new:
return [{'type': 'changed', 'data': (old, new), 'path': path}]
return
def printdict(d):
""" helper function for formatdiff"""
if type(d)==type(list()):
return u'<ul>%s</ul>' % '\n'.join(["<li>%s</li>" % printdict(v) for v in d])
if not type(d)==type(dict()):
return "%s" % unicode(d)
res=['']
for k,v in [(k,v) for k,v in d.items() if k not in ['mepref','comref']]:
res.append(u"<dl><dt>%s</dt><dd>%s</dd></dl>" % (k,printdict(v)))
return '%s' % u'\n'.join(res)
def formatdiff(data):
""" formats diffs to html """
res=[]
for di in sorted(sorted(data,key=itemgetter('path'))):
if di['type']=='changed':
res.append(u'<tr><td>change</td><td>%s</td><td>%s</td><td>%s</td></tr>' % ('/'.join([str(x) for x in di['path']]),printdict(di['data'][1]),printdict(di['data'][0])))
continue
if di['type']=='deleted':
res.append(u"<tr><td>%s</td><td>%s</td><td></td><td>%s</td></tr>" % (di['type'], '/'.join([str(x) for x in di['path']]), printdict(di['data'])))
if di['type']=='added':
res.append(u"<tr><td>%s</td><td>%s</td><td>%s</td><td></td></tr>" % (di['type'], '/'.join([str(x) for x in di['path']]), printdict(di['data'])))
return "<table><thead><tr width='90%%'><th>type</th><th>change in</th><th>new</th><th>old</th></tr></thead><tbody>%s</tbody></table>" % '\n'.join(res)
def test_diff():
d2={ 'a': [ {'aa': 2, 'bb': 3 }, { 'aa': 1, 'bb':3 }, {'AA': 1, 'BB': { 'asdf': { 'asdf': 'qwer'}}}, {'Mm': [ 'a','b','c','d'] } ],
'c': [ 0,1,2,3,4]}
d1={ 'a': [ { 'aa': 1, 'bb':3 }, {'AA': 1, 'BB': { 'asdf': '2'}}, {'Mm': [ 'a','b','c','d'] } ],
'b': { 'z': 9, 'x': 8 },
'c': [ 1,2,3,4]}
d=diff(d1,d2)
import pprint
pprint.pprint(d)
print formatdiff(d)
def test_dump(html_only=False):
""" don't try this at home. it's an example, of how you can get a glimpse on some nosql collection"""
import pymongo
conn = pymongo.Connection()
db=conn.parltrack
if not html_only:
dump_schema(db.dossiers.find()[:100],100,['changes'],'Dossiers')
dump_schema(db.ep_meps.find()[:100],100,title="MEPs")
dump_schema(db.ep_votes.find()[:100],100,title='Votes')
dump_schema(db.ep_com_meets.find(),db.ep_com_meets.find().count(),title='Committee Meetings')
#dump_schema(db.dossiers.find()[:100],100,['changes'],'Dossiers',format='full-html')
#dump_schema(db.ep_meps.find()[:100],100,title="MEPs",format='html')
#dump_schema(db.ep_votes.find()[:100],100,title='Votes',format='html')
#dump_schema(db.ep_com_meets.find(),db.ep_com_meets.find().count(),title='Committee Meetings',format='html')
dump_schema(db.dossiers.find(),db.dossiers.find().count(),['changes'],'Dossiers',format='full-html')
dump_schema(db.ep_meps.find(),db.ep_meps.find().count(),title="MEPs",format='html')
dump_schema(db.ep_votes.find(),db.ep_votes.find().count(),title='Votes',format='html')
dump_schema(db.ep_com_meets.find(),db.ep_com_meets.find().count(),title='Committee Meetings',format='html')
if __name__ == "__main__":
test_diff()
#test_dump(html_only=True)
@stef
Copy link
Author

stef commented Jul 30, 2011

for an example of the output see this:
http://parltrack.euwiki.org/static/cdat.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment