Skip to content

Instantly share code, notes, and snippets.

@kjam
Last active October 27, 2015 20:32
Show Gist options
  • Save kjam/b237fc5b503be3f7b438 to your computer and use it in GitHub Desktop.
Save kjam/b237fc5b503be3f7b438 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import agate
import xlrd
from xlrd.sheet import ctype_text
text_type = agate.Text()
number_type = agate.Number()
date_type = agate.Date()
def get_types(example_row):
types = []
for v in example_row:
value_type = ctype_text[v.ctype]
if value_type == 'text':
types.append(text_type)
elif value_type == 'number':
types.append(number_type)
elif value_type == 'xldate':
types.append(date_type)
else:
types.append(text_type)
return types
def float_to_str(val):
if isinstance(val, float):
return str(val)
elif isinstance(val, (str, unicode)):
return val.encode('ascii', errors='replace').strip()
return val
def get_new_array(old_array, function_to_clean):
new_arr = []
for row in old_array:
cleaned_row = [function_to_clean(rv) for rv in row]
new_arr.append(cleaned_row)
return new_arr
def remove_bad_chars(val):
if val == '-':
return None
return val
def get_pci_data():
pci_workbook = xlrd.open_workbook('perceived_corruption_index.xls')
pci_sheet = pci_workbook.sheets()[0]
pci_title_rows = zip(pci_sheet.row_values(1), pci_sheet.row_values(2))
pci_titles = [t[0] + ' ' + t[1] for t in pci_title_rows]
pci_titles = [t.strip() for t in pci_titles]
pci_titles[0] = pci_titles[0] + ' Duplicate'
pci_rows = [pci_sheet.row_values(r) for r in range(3, pci_sheet.nrows)]
pci_rows = get_new_array(pci_rows, float_to_str)
pci_types = get_types(pci_sheet.row(3))
return agate.Table(pci_rows, zip(pci_titles, pci_types))
def cl_data():
workbook = xlrd.open_workbook('unicef_oct_2014.xls')
sheet = workbook.sheets()[0]
title_rows = zip(sheet.row_values(4), sheet.row_values(5))
titles = [t[0] + ' ' + t[1] for t in title_rows]
titles = [t.strip() if len(t.strip()) else 'Unknown' for t in titles]
country_rows = [sheet.row_values(r) for r in range(6, 114)]
country_rows = get_new_array(country_rows, float_to_str)
country_rows = get_new_array(country_rows, remove_bad_chars)
cl_types = get_types(sheet.row(6))
return agate.Table(country_rows, zip(titles, cl_types))
def main():
cl_table = cl_data()
pci_table = get_pci_data()
print(pci_table.column_names)
print(cl_table.column_names)
new_pci_table = pci_table.join(cl_table, 'Country / Territory',
'Countries and areas', inner=True)
try:
new_cl_table = cl_table.join(pci_table, 'Countries and areas',
'Country / Territory', inner=True)
except Exception as e:
print(e)
new_cl_table = False
ranked = cl_table.compute([(agate.Rank('Total (%)', reverse=True),
'Total Child Labor Rank')])
pci_rank = pci_table.join(ranked, 'Country / Territory',
'Countries and areas', inner=True)
print(pci_rank.columns['Total (%)'].values())
return new_pci_table, new_cl_table, pci_rank
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment