Skip to content

Instantly share code, notes, and snippets.

@xlfe
Created April 18, 2014 11:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xlfe/fe28ad7782432623f810 to your computer and use it in GitHub Desktop.
Save xlfe/fe28ad7782432623f810 to your computer and use it in GitHub Desktop.
Scans for all .xml files in subdirectories and converts them to one big flat format, outputted to stdout
#!/usr/bin/which python
# flatten.py
# Written by Felix Barbalet (Pivotal Analytics Pty Ltd)
# Scans for all .xml files in subdirectories and converts them to one big flat format, outputted to stdout
import os,sys
import xmltodict
from collections import OrderedDict
import json
# Some global variables
headers = []
plans = []
xml_files = []
#Debuging options
LIMIT = None #Set this to a positive number to limit the number of xml files processed
DEBUG = False #Set this to true to supress output of the flat file at the end of the process
#Walk subdirectories collecting xml files
for path,dirs,files in os.walk('./'):
if path.startswith('./.'):
continue
xml_files.extend([os.path.join(path,f) for f in files if f.endswith('.xml')])
#
#
# Some misc helper functions
def flatten_xml(fn):
_file = open(fn,'r')
return xmltodict.parse(_file.read())
def remove_at(i):
if i.startswith('@') or i.startswith('#'):
return i[1:]
else:
return i
rename_map = [
remove_at,
lambda i: i.replace('HospitalCover','Hospital'),
lambda i: i.replace('WaitingPeriods','Waiting'),
]
def parse_items(t):
for i in rename_map:
t = i(t)
return t
def t_to_dict(t_list):
assert type(t_list[0]) == tuple,t_list
return {t[0]:t[1] for t in filter(lambda t: 'xmlns' not in t,t_list)}
def convert(tuple_list):
"""Convert tuples to a nested dict object"""
data = {}
lists = []
#A list of tuples
for t in tuple_list:
if t[0] == 'xmlns':
continue
key = t[0]
while type(t[1]) == tuple:
key += '.' + t[0]
t = t[1]
if key in data:
# multiple values are converted to a list
if key in lists:
data[key].append(t[1])
else:
lists.append(key)
old = data[key]
data[key] = [old,t[1]]
else:
data[key] = t[1]
return data
def flatten(obj):
if type(obj) is OrderedDict:
for o in obj:
_o = parse_items(o)
for c in flatten(obj[o]):
yield (_o,c)
elif type(obj) is list:
for o in obj:
yield [c for c in flatten(o)]
elif type(obj) is unicode:
t = obj.replace('\n',' ')
yield parse_items(t)
elif type(obj) is type(None):
yield None
else:
raise TypeError(type(obj))
def get_key_item((k,v),path=[]):
if type(v) is not tuple:
return ('.'.join(path+[k]), v)
else:
return get_key_item(v,path+[k])
#
#
# These handle specific xml components - they are mapped using the item_map dictionary below
# They take a key-value pair
# Key: XML path
# Value: Tuple of values for processing
# The functions are iterables - ie they yield a tuple of key,value results
#
def hospital_medical_service(k,_v):
if type(_v) is not list:
raise TypeError
v = t_to_dict(_v)
result = None
if v['Cover'] =='Covered':
if 'Partial' in v and v['Partial'] == 'false':
result = v['Title'],v['Cover']
else:
result = v['Title'],v['Cover'] + '-partial'
elif v['Cover'] == 'Restricted':
if 'Partial' in v and v['Partial'] == 'true':
result = v['Title'],'Partially-Restricted'
else:
result = v['Title'],v['Cover']
elif v['Cover'] == 'NotCovered':
result = v['Title'],v['Cover']
elif v['Cover'] == 'BLP':
result = v['Title'] + '.BenefitLimitationPeriod', v['LimitationPeriod']
if result is None:
raise NotImplementedError(v)
yield ('Hospital.MedicalServices.' + result[0],result[1])
def hospital_waiting_period(k,_v):
v = t_to_dict(_v)
if 'Title' in v:
title = 'Hospital.WaitingPeriod.' + v['Title']
else:
title = 'Hospital.WaitingPeriod'
if 'Unit' in v:
unit = v['Unit'].lower()
else:
unit = 'month'
try:
w= int(v['text'])
if w ==0:
yield (title,'NoWaitingPeriod')
else:
yield (title, str(w) + ' ' + unit)
except:
yield(None,None)
# Various combinations of data for the benefit types for general health services
ghs_benefit_types = [
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList', u'Covered', u'HasSpecialFeatures', u'Title', u'WaitingPeriod' ], # 18490 0
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList', u'Covered', u'Title', u'WaitingPeriod' ], # 2722 1
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList.Benefit', u'Covered', u'HasSpecialFeatures', u'Title', u'WaitingPeriod' ], # 10676 2
[u'BenefitLimitsGroup', u'BenefitsList.BenefitsList.Benefit', u'Covered', u'Title', u'WaitingPeriod' ], # 1288 3
[ u'Covered', u'HasSpecialFeatures', u'Title' ], # 6109 4
[ u'Covered', u'Title' ] # 715 5
]
def general_health_service(k,_v):
# k = 'GeneralHealthCover.GeneralHealthServices.GeneralHealthService'
k = 'General.Service'
v = convert(_v)
btype = [_ for _ in v]
btype.sort()
btype = ghs_benefit_types.index(btype)
if btype >= 4:
assert v['Covered'] in ['0','false']
if btype == 4: assert v['HasSpecialFeatures'] == '0'
else:
assert v['Covered'] in ['1','true']
k = k + '.' + v['BenefitLimitsGroup'] + '.' + v['Title']
yield (
k + '.WaitingPeriod',
v['WaitingPeriod'] + ' months'
)
try:
yield (
k + '.SpecialFeatures',
v['HasSpecialFeatures']
)
except:
pass
if btype in [2,3]:
benefits = [v['BenefitsList.BenefitsList.Benefit']]
else:
benefits = v['BenefitsList.BenefitsList']
for b in benefits:
try:
b_dict = t_to_dict(b)
if len(b_dict) == 2:
b_dict[u'Type'] = 'Dollars'
for _n in ['Item','Type','text']:
assert _n in b_dict,'{} {}'.format(_n, b)
except AssertionError:
if len(b) == 2:
b.insert(1,'Dollars')
b_dict = {
u'Item':b[0],
u'Type':b[1],
'text':b[2]
}
if b_dict['Type'] == 'Dollars':
b_amount = '$' + b_dict['text']
elif b_dict['Type'] == 'Percent':
b_amount = b_dict['text'] + '%'
else:
raise Exception(b_dict)
yield (
k + '.' + b_dict['Item'],
b_amount
)
#The possible service types
service_types = ['Acupuncture','Ambulance','Chiropractic','DentalGeneral',
'DentalMajor','Endodontic','GlucoseMonitor','HearingAids','Massage','Naturopathy',
'NonPBS','Optical','Orthodontic','Physiotherapy','Podiatry','Psychology']
#The possible limit types
limit_types = set(['AnnualLimit','LimitPerPerson','LimitPerPolicy','LifetimeLimit','LimitPerService'])
#Various combinations of benefit limits
# 94 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'Title']
# 1448 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'Title']
# 411 [u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title']
# 124149 [u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title']
# 86642 [u'ServicesCombined.ServicesCombined', u'Title']
# 46664 [u'ServicesCombined.ServicesCombined', u'Title']
# 21 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title']
# 18414 [u'ServiceCountLimit.ServiceCountLimit', u'ServicesCombined.ServicesCombined', u'ServicesCombined.ServicesCombined.Service', u'Title']
def general_benefit_limits(k,_v):
v = convert(_v)
#Map to General.Service
k = 'General.Service.{}.Limit'.format(v['Title'])
if 'NoLimitOnPreventativeDental' in v:
if v['NoLimitOnPreventativeDental'] == 'true':
yield ('General.Service.PreventativeDental.Limit','NoLimit')
_services = {}
other_unlisted = None
services_combined = v['ServicesCombined.ServicesCombined']
if services_combined in ['true','false']:
other_unlisted = services_combined == 'true'
sl,service = v['ServicesCombined.ServicesCombined.Service']
_services[service] = {
'SubLimitsApply':sl
}
elif services_combined[0] in ['true','false']:
other_unlisted = services_combined[0] == 'true'
services = services_combined[1:]
for service in services:
if type(service) in [str,unicode]:
_services[service] = None
elif type(service) == list:
if type(service[0]) in [str,unicode]:
for _s in service:
_services[_s] = None
elif type(service[0]) == tuple:
try:
t =t_to_dict(service)
except:
print services
raise
text = t.pop('text')
_services[text] = t
else:
_services[services_combined] = None
assert services_combined in service_types
if other_unlisted is not None:
yield (k + '.OtherUnlisted',str(other_unlisted))
for service,details in _services.iteritems():
if details is not None:
if 'IndLifetimeLimit' in details:
yield (k + '.' + service + '.IndLifetimeLimit','$' + details['IndLifetimeLimit'])
for limit in limit_types:
if limit in v:
yield (k + '.' + limit,'$' + v[limit])
#Keep track of ambulance waiting details
aw = None
#This is a very "hacky" way of doing this... (but we do assert sanity below)
def ambulance_waiting(k,_v):
global aw
_k = k.split('.')[-1]
if _k == 'Unit':
assert aw is None
aw = _v
elif _k == 'text':
assert aw is not None
try:
w = int(_v)
except:
w = None
if w == 0:
yield ('General.Ambulance.WaitingPeriod','NoWaitingPeriod')
else:
yield ('General.Ambulance.WaitingPeriod',_v + ' ' + aw.lower())
aw = None
def hospital_excess_waivers(k,_v):
k = 'Hospital.Excesses.ExcessWaivers.'
if type(_v) == list:
k += _v[0]
else:
k += _v
yield (k,'True')
def collapse_text(k,_v):
yield (k[:-5],v)
def skip(k,v):
yield (None,None)
def show(k,v):
print k
print v
yield (None,None)
#
# Define which XML components are handled by which functions
#
item_map = {
'Hospital.Waiting.WaitingPeriod':hospital_waiting_period,
'Hospital.MedicalServices.MedicalService':hospital_medical_service,
'GeneralHealthCover.GeneralHealthServices.GeneralHealthService':general_health_service,
'GeneralHealthCover.BenefitLimits.BenefitLimit':general_benefit_limits,
'GeneralHealthCover.ClassificationGeneralHealth.text':collapse_text,
'GeneralHealthCover.OtherProductFeatures.text':collapse_text,
'GeneralHealthCover.SpecialFeatures.text':collapse_text,
'Hospital.Excesses.ExcessWaivers.Waiver':hospital_excess_waivers,
'GeneralHealthCover.PreferredProviderServices.List.Service':skip,
'GeneralHealthCover.GeneralHealthAmbulance.WaitingPeriod.Unit':ambulance_waiting,
'GeneralHealthCover.GeneralHealthAmbulance.WaitingPeriod.text':ambulance_waiting
}
#
# Now the actual processing begins - iterate through each xml file
#
for fn in xml_files:
xml = flatten_xml(fn)
items = flatten(xml['Product'])
plan = {}
for i in items:
k,v = get_key_item(i)
if v is None or 'xmlns' in k:
continue
if k in item_map:
item_processor = item_map[k]
try:
processed = [kv for kv in item_processor(k,v)]
except:
print k
raise
for (_k,_v) in processed:
if _v is None:
continue
plan[_k] = _v
headers.append(_k)
elif DEBUG:
continue
else:
if type(v) is list:
raise Exception(k)
if v is not None:
plan[k] = v
headers.append(k)
if not DEBUG:
plans.append(plan)
if LIMIT is not None:
LIMIT -=1
if LIMIT ==0:
break
if DEBUG:
exit()
# Write the flattened data to stdout
#First the headers
s_headers = []
for s in headers:
if s not in s_headers:
s_headers.append(s)
tosort = s_headers[25:]
tosort.sort()
s_headers = s_headers[:25] + tosort
print '\t'.join([k for k in s_headers])
#Then each plan
for plan in plans:
items = []
for k in s_headers:
if k not in plan:
items.append('')
else:
items.append(plan[k].replace(',',' ').replace('\n',' ').replace('\t',' ').replace('"',' ').strip())
try:
print '\t'.join(items)
except TypeError:
sys.stderr.write(json.dumps(items,indent=1))
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment