Last active
October 27, 2015 20:32
-
-
Save kjam/b237fc5b503be3f7b438 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import agate | |
import xlrd | |
from xlrd.sheet import ctype_text | |
text_type = agate.Text() | |
number_type = agate.Number() | |
date_type = agate.Date() | |
def get_types(example_row): | |
types = [] | |
for v in example_row: | |
value_type = ctype_text[v.ctype] | |
if value_type == 'text': | |
types.append(text_type) | |
elif value_type == 'number': | |
types.append(number_type) | |
elif value_type == 'xldate': | |
types.append(date_type) | |
else: | |
types.append(text_type) | |
return types | |
def float_to_str(val): | |
if isinstance(val, float): | |
return str(val) | |
elif isinstance(val, (str, unicode)): | |
return val.encode('ascii', errors='replace').strip() | |
return val | |
def get_new_array(old_array, function_to_clean): | |
new_arr = [] | |
for row in old_array: | |
cleaned_row = [function_to_clean(rv) for rv in row] | |
new_arr.append(cleaned_row) | |
return new_arr | |
def remove_bad_chars(val): | |
if val == '-': | |
return None | |
return val | |
def get_pci_data(): | |
pci_workbook = xlrd.open_workbook('perceived_corruption_index.xls') | |
pci_sheet = pci_workbook.sheets()[0] | |
pci_title_rows = zip(pci_sheet.row_values(1), pci_sheet.row_values(2)) | |
pci_titles = [t[0] + ' ' + t[1] for t in pci_title_rows] | |
pci_titles = [t.strip() for t in pci_titles] | |
pci_titles[0] = pci_titles[0] + ' Duplicate' | |
pci_rows = [pci_sheet.row_values(r) for r in range(3, pci_sheet.nrows)] | |
pci_rows = get_new_array(pci_rows, float_to_str) | |
pci_types = get_types(pci_sheet.row(3)) | |
return agate.Table(pci_rows, zip(pci_titles, pci_types)) | |
def cl_data(): | |
workbook = xlrd.open_workbook('unicef_oct_2014.xls') | |
sheet = workbook.sheets()[0] | |
title_rows = zip(sheet.row_values(4), sheet.row_values(5)) | |
titles = [t[0] + ' ' + t[1] for t in title_rows] | |
titles = [t.strip() if len(t.strip()) else 'Unknown' for t in titles] | |
country_rows = [sheet.row_values(r) for r in range(6, 114)] | |
country_rows = get_new_array(country_rows, float_to_str) | |
country_rows = get_new_array(country_rows, remove_bad_chars) | |
cl_types = get_types(sheet.row(6)) | |
return agate.Table(country_rows, zip(titles, cl_types)) | |
def main(): | |
cl_table = cl_data() | |
pci_table = get_pci_data() | |
print(pci_table.column_names) | |
print(cl_table.column_names) | |
new_pci_table = pci_table.join(cl_table, 'Country / Territory', | |
'Countries and areas', inner=True) | |
try: | |
new_cl_table = cl_table.join(pci_table, 'Countries and areas', | |
'Country / Territory', inner=True) | |
except Exception as e: | |
print(e) | |
new_cl_table = False | |
ranked = cl_table.compute([(agate.Rank('Total (%)', reverse=True), | |
'Total Child Labor Rank')]) | |
pci_rank = pci_table.join(ranked, 'Country / Territory', | |
'Countries and areas', inner=True) | |
print(pci_rank.columns['Total (%)'].values()) | |
return new_pci_table, new_cl_table, pci_rank | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment