Created
April 9, 2016 06:53
-
-
Save Jimexist/62aae3027eeafd6a03b487674aa5f153 to your computer and use it in GitHub Desktop.
HCP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
import csv | |
import json | |
import re | |
from collections import namedtuple | |
Item = namedtuple('Item', 'country group_id group name name_id month value') | |
def until_empty(func): | |
@wraps(func) | |
def func_wrapper(reader, *args): | |
rows = [] | |
for row in reader: | |
if any(row): | |
rows.append([i.strip() for i in row]) | |
else: | |
break; | |
if not rows: | |
raise Exception("too many empty rows") | |
return func(rows, *args) | |
return func_wrapper | |
@until_empty | |
def parse_country(rows): | |
return rows[0][1] | |
@until_empty | |
def parse_months(rows): | |
return [i for i in rows[0][2:] if i] | |
def transform_value(val): | |
val = val.strip().replace(',', '') | |
if val == '- 0' or not val: | |
return 0 | |
else: | |
return float(val) | |
@until_empty | |
def parse_group(rows, group_id, months, country): | |
if not rows: | |
return | |
group_name = rows[0][1] | |
if group_name.startswith("Total"): | |
print("skipping", country, group_name) | |
return | |
print("processing", country, "group", group_id) | |
if not(any(rows[0][2:])): | |
details = rows[1:] | |
else: | |
assert len(rows) == 1, "must be a single item group" | |
details = [rows[0]] | |
for name_id, row in enumerate(details): | |
item_name = row[1] | |
if item_name.startswith("Total"): | |
print("skipping", country, item_name) | |
continue | |
item_values = row[2:] | |
for i, month in zip(item_values, months): | |
if i: | |
yield Item( | |
country=country, | |
group=group_name, | |
group_id=group_id, | |
name=item_name, | |
name_id=name_id, | |
month=month, | |
value=transform_value(i)) | |
def parse_csv(fname): | |
with open(fname) as fin: | |
reader = csv.reader(fin) | |
country = parse_country(reader) | |
months = parse_months(reader) | |
group_id = 0 | |
while True: | |
try: | |
for item in parse_group(reader, group_id, months, country): | |
yield item._asdict() | |
else: | |
group_id += 1 | |
except Exception as e: | |
print('got exception', e) | |
break | |
import os | |
all_items = [] | |
for root, dirs, files in os.walk('.', topdown=False): | |
for name in files: | |
if name.endswith('.csv'): | |
all_items.extend(list(parse_csv(os.path.join(root, name)))) | |
val = json.dumps(all_items, ensure_ascii=False, indent=2) | |
with open('output.json', 'wb') as fout: | |
fout.write(val.encode('utf-8')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment