-
-
Save xlfe/fe28ad7782432623f810 to your computer and use it in GitHub Desktop.
Scans for all .xml files in subdirectories and converts them to one big flat format, outputted to stdout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/which python | |
# flatten.py | |
# Written by Felix Barbalet (Pivotal Analytics Pty Ltd) | |
# Scans for all .xml files in subdirectories and converts them to one big flat format, outputted to stdout | |
import os,sys | |
import xmltodict | |
from collections import OrderedDict | |
import json | |
# Some global variables | |
headers = [] | |
plans = [] | |
xml_files = [] | |
#Debuging options | |
LIMIT = None #Set this to a positive number to limit the number of xml files processed | |
DEBUG = False #Set this to true to supress output of the flat file at the end of the process | |
#Walk subdirectories collecting xml files | |
for path,dirs,files in os.walk('./'): | |
if path.startswith('./.'): | |
continue | |
xml_files.extend([os.path.join(path,f) for f in files if f.endswith('.xml')]) | |
# | |
# | |
# Some misc helper functions | |
def flatten_xml(fn): | |
_file = open(fn,'r') | |
return xmltodict.parse(_file.read()) | |
def remove_at(i): | |
if i.startswith('@') or i.startswith('#'): | |
return i[1:] | |
else: | |
return i | |
rename_map = [ | |
remove_at, | |
lambda i: i.replace('HospitalCover','Hospital'), | |
lambda i: i.replace('WaitingPeriods','Waiting'), | |
] | |
def parse_items(t): | |
for i in rename_map: | |
t = i(t) | |
return t | |
def t_to_dict(t_list): | |
assert type(t_list[0]) == tuple,t_list | |
return {t[0]:t[1] for t in filter(lambda t: 'xmlns' not in t,t_list)} | |
def convert(tuple_list): | |
"""Convert tuples to a nested dict object""" | |
data = {} | |
lists = [] | |
#A list of tuples | |
for t in tuple_list: | |
if t[0] == 'xmlns': | |
continue | |
key = t[0] | |
while type(t[1]) == tuple: | |
key += '.' + t[0] | |
t = t[1] | |
if key in data: | |
# multiple values are converted to a list | |
if key in lists: | |
data[key].append(t[1]) | |
else: | |
lists.append(key) | |
old = data[key] | |
data[key] = [old,t[1]] | |
else: | |
data[key] = t[1] | |
return data | |
def flatten(obj): | |
if type(obj) is OrderedDict: | |
for o in obj: | |
_o = parse_items(o) | |
for c in flatten(obj[o]): | |
yield (_o,c) | |
elif type(obj) is list: | |
for o in obj: | |
yield [c for c in flatten(o)] | |
elif type(obj) is unicode: | |
t = obj.replace('\n',' ') | |
yield parse_items(t) | |
elif type(obj) is type(None): | |
yield None | |
else: | |
raise TypeError(type(obj)) | |
def get_key_item((k,v),path=[]): | |
if type(v) is not tuple: | |
return ('.'.join(path+[k]), v) | |
else: | |
return get_key_item(v,path+[k]) | |
# | |
# | |
# These handle specific xml components - they are mapped using the item_map dictionary below | |
# They take a key-value pair | |
# Key: XML path | |
# Value: Tuple of values for processing | |
# The functions are iterables - ie they yield a tuple of key,value results | |
# | |
def hospital_medical_service(k,_v): | |
if type(_v) is not list: | |
raise TypeError | |
v = t_to_dict(_v) | |
result = None | |
if v['Cover'] =='Covered': | |
if 'Partial' in v and v['Partial'] == 'false': | |
result = v['Title'],v['Cover'] | |
else: | |
result = v['Title'],v['Cover'] + '-partial' | |
elif v['Cover'] == 'Restricted': | |
if 'Partial' in v and v['Partial'] == 'true': | |
result = v['Title'],'Partially-Restricted' | |
else: | |
result = v['Title'],v['Cover'] | |
elif v['Cover'] == 'NotCovered': | |
result = v['Title'],v['Cover'] | |
elif v['Cover'] == 'BLP': | |
result = v['Title'] + '.BenefitLimitationPeriod', v['LimitationPeriod'] | |
if result is None: | |
raise NotImplementedError(v) | |
yield ('Hospital.MedicalServices.' + result[0],result[1]) | |
def hospital_waiting_period(k,_v): | |
v = t_to_dict(_v) | |
if 'Title' in v: | |
title = 'Hospital.WaitingPeriod.' + v['Title'] | |
else: | |
title = 'Hospital.WaitingPeriod' | |
if 'Unit' in v: | |
unit = v['Unit'].lower() | |
else: | |
unit = 'month' | |
try: | |
w= int(v['text']) | |
if w ==0: | |
yield (title,'NoWaitingPeriod') | |
else: | |
yield (title, str(w) + ' ' + unit) | |
except: | |
yield(None,None) | |
# Various combinations of data for the benefit types for general health services | |
ghs_benefit_types = [ | |
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList', u'Covered', u'HasSpecialFeatures', u'Title', u'WaitingPeriod' ], # 18490 0 | |
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList', u'Covered', u'Title', u'WaitingPeriod' ], # 2722 1 | |
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList.Benefit', u'Covered', u'HasSpecialFeatures', u'Title', u'WaitingPeriod' ], # 10676 2 | |
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList.Benefit', u'Covered', u'Title', u'WaitingPeriod' ], # 1288 3 | |
[ u'Covered', u'HasSpecialFeatures', u'Title' ], # 6109 4 | |
[ u'Covered', u'Title' ] # 715 5 | |
] | |
def general_health_service(k,_v): | |
# k = 'GeneralHealthCover.GeneralHealthServices.GeneralHealthService' | |
k = 'General.Service' | |
v = convert(_v) | |
btype = [_ for _ in v] | |
btype.sort() | |
btype = ghs_benefit_types.index(btype) | |
if btype >= 4: | |
assert v['Covered'] in ['0','false'] | |
if btype == 4: assert v['HasSpecialFeatures'] == '0' | |
else: | |
assert v['Covered'] in ['1','true'] | |
k = k + '.' + v['BenefitLimitsGroup'] + '.' + v['Title'] | |
yield ( | |
k + '.WaitingPeriod', | |
v['WaitingPeriod'] + ' months' | |
) | |
try: | |
yield ( | |
k + '.SpecialFeatures', | |
v['HasSpecialFeatures'] | |
) | |
except: | |
pass | |
if btype in [2,3]: | |
benefits = [v['BenefitsList.BenefitsList.Benefit']] | |
else: | |
benefits = v['BenefitsList.BenefitsList'] | |
for b in benefits: | |
try: | |
b_dict = t_to_dict(b) | |
if len(b_dict) == 2: | |
b_dict[u'Type'] = 'Dollars' | |
for _n in ['Item','Type','text']: | |
assert _n in b_dict,'{} {}'.format(_n, b) | |
except AssertionError: | |
if len(b) == 2: | |
b.insert(1,'Dollars') | |
b_dict = { | |
u'Item':b[0], | |
u'Type':b[1], | |
'text':b[2] | |
} | |
if b_dict['Type'] == 'Dollars': | |
b_amount = '$' + b_dict['text'] | |
elif b_dict['Type'] == 'Percent': | |
b_amount = b_dict['text'] + '%' | |
else: | |
raise Exception(b_dict) | |
yield ( | |
k + '.' + b_dict['Item'], | |
b_amount | |
) | |
#The possible service types | |
service_types = ['Acupuncture','Ambulance','Chiropractic','DentalGeneral', | |
'DentalMajor','Endodontic','GlucoseMonitor','HearingAids','Massage','Naturopathy', | |
'NonPBS','Optical','Orthodontic','Physiotherapy','Podiatry','Psychology'] | |
#The possible limit types | |
limit_types = set(['AnnualLimit','LimitPerPerson','LimitPerPolicy','LifetimeLimit','LimitPerService']) | |
#Various combinations of benefit limits | |
# 94 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'Title'] | |
# 1448 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'Title'] | |
# 411 [u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title'] | |
# 124149 [u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title'] | |
# 86642 [u'ServicesCombined.ServicesCombined', u'Title'] | |
# 46664 [u'ServicesCombined.ServicesCombined', u'Title'] | |
# 21 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title'] | |
# 18414 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title'] | |
def general_benefit_limits(k,_v): | |
v = convert(_v) | |
#Map to General.Service | |
k = 'General.Service.{}.Limit'.format(v['Title']) | |
if 'NoLimitOnPreventativeDental' in v: | |
if v['NoLimitOnPreventativeDental'] == 'true': | |
yield ('General.Service.PreventativeDental.Limit','NoLimit') | |
_services = {} | |
other_unlisted = None | |
services_combined = v['ServicesCombined.ServicesCombined'] | |
if services_combined in ['true','false']: | |
other_unlisted = services_combined == 'true' | |
sl,service = v['ServicesCombined.ServicesCombined.Service'] | |
_services[service] = { | |
'SubLimitsApply':sl | |
} | |
elif services_combined[0] in ['true','false']: | |
other_unlisted = services_combined[0] == 'true' | |
services = services_combined[1:] | |
for service in services: | |
if type(service) in [str,unicode]: | |
_services[service] = None | |
elif type(service) == list: | |
if type(service[0]) in [str,unicode]: | |
for _s in service: | |
_services[_s] = None | |
elif type(service[0]) == tuple: | |
try: | |
t =t_to_dict(service) | |
except: | |
print services | |
raise | |
text = t.pop('text') | |
_services[text] = t | |
else: | |
_services[services_combined] = None | |
assert services_combined in service_types | |
if other_unlisted is not None: | |
yield (k + '.OtherUnlisted',str(other_unlisted)) | |
for service,details in _services.iteritems(): | |
if details is not None: | |
if 'IndLifetimeLimit' in details: | |
yield (k + '.' + service + '.IndLifetimeLimit','$' + details['IndLifetimeLimit']) | |
for limit in limit_types: | |
if limit in v: | |
yield (k + '.' + limit,'$' + v[limit]) | |
#Keep track of ambulance waiting details | |
aw = None | |
#This is a very "hacky" way of doing this... (but we do assert sanity below) | |
def ambulance_waiting(k,_v): | |
global aw | |
_k = k.split('.')[-1] | |
if _k == 'Unit': | |
assert aw is None | |
aw = _v | |
elif _k == 'text': | |
assert aw is not None | |
try: | |
w = int(_v) | |
except: | |
w = None | |
if w == 0: | |
yield ('General.Ambulance.WaitingPeriod','NoWaitingPeriod') | |
else: | |
yield ('General.Ambulance.WaitingPeriod',_v + ' ' + aw.lower()) | |
aw = None | |
def hospital_excess_waivers(k,_v): | |
k = 'Hospital.Excesses.ExcessWaivers.' | |
if type(_v) == list: | |
k += _v[0] | |
else: | |
k += _v | |
yield (k,'True') | |
def collapse_text(k,_v): | |
yield (k[:-5],v) | |
def skip(k,v): | |
yield (None,None) | |
def show(k,v): | |
print k | |
print v | |
yield (None,None) | |
# | |
# Define which XML components are handled by which functions | |
# | |
item_map = { | |
'Hospital.Waiting.WaitingPeriod':hospital_waiting_period, | |
'Hospital.MedicalServices.MedicalService':hospital_medical_service, | |
'GeneralHealthCover.GeneralHealthServices.GeneralHealthService':general_health_service, | |
'GeneralHealthCover.BenefitLimits.BenefitLimit':general_benefit_limits, | |
'GeneralHealthCover.ClassificationGeneralHealth.text':collapse_text, | |
'GeneralHealthCover.OtherProductFeatures.text':collapse_text, | |
'GeneralHealthCover.SpecialFeatures.text':collapse_text, | |
'Hospital.Excesses.ExcessWaivers.Waiver':hospital_excess_waivers, | |
'GeneralHealthCover.PreferredProviderServices.List.Service':skip, | |
'GeneralHealthCover.GeneralHealthAmbulance.WaitingPeriod.Unit':ambulance_waiting, | |
'GeneralHealthCover.GeneralHealthAmbulance.WaitingPeriod.text':ambulance_waiting | |
} | |
# | |
# Now the actual processing begins - iterate through each xml file | |
# | |
for fn in xml_files: | |
xml = flatten_xml(fn) | |
items = flatten(xml['Product']) | |
plan = {} | |
for i in items: | |
k,v = get_key_item(i) | |
if v is None or 'xmlns' in k: | |
continue | |
if k in item_map: | |
item_processor = item_map[k] | |
try: | |
processed = [kv for kv in item_processor(k,v)] | |
except: | |
print k | |
raise | |
for (_k,_v) in processed: | |
if _v is None: | |
continue | |
plan[_k] = _v | |
headers.append(_k) | |
elif DEBUG: | |
continue | |
else: | |
if type(v) is list: | |
raise Exception(k) | |
if v is not None: | |
plan[k] = v | |
headers.append(k) | |
if not DEBUG: | |
plans.append(plan) | |
if LIMIT is not None: | |
LIMIT -=1 | |
if LIMIT ==0: | |
break | |
if DEBUG: | |
exit() | |
# Write the flattened data to stdout | |
#First the headers | |
s_headers = [] | |
for s in headers: | |
if s not in s_headers: | |
s_headers.append(s) | |
tosort = s_headers[25:] | |
tosort.sort() | |
s_headers = s_headers[:25] + tosort | |
print '\t'.join([k for k in s_headers]) | |
#Then each plan | |
for plan in plans: | |
items = [] | |
for k in s_headers: | |
if k not in plan: | |
items.append('') | |
else: | |
items.append(plan[k].replace(',',' ').replace('\n',' ').replace('\t',' ').replace('"',' ').strip()) | |
try: | |
print '\t'.join(items) | |
except TypeError: | |
sys.stderr.write(json.dumps(items,indent=1)) | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment