Skip to content

Instantly share code, notes, and snippets.

@SohierDane
Last active October 11, 2017 20:03
Show Gist options
  • Save SohierDane/f545be3040a0de86f9016b94576dd24b to your computer and use it in GitHub Desktop.
Save SohierDane/f545be3040a0de86f9016b94576dd24b to your computer and use it in GitHub Desktop.
NBER macrohistory database preparation
"""
Download all files from
http://www.nber.org/databases/macrohistory/contents/
and repackage into a data csv and a documentation csv
"""
import argparse
import os
import pandas as pd
import requests
from random import random
from time import sleep
CHAPTERS = [str(x).zfill(2) for x in range(1, 17)]
# format with chapter, filename, file format (dat or doc)
DATA_BASE_URL = "http://www.nber.org/databases/macrohistory/data/{0}/{1}.db"
DOCS_BASE_URL = "http://www.nber.org/databases/macrohistory/rectdata/{0}/docs/{1}.txt"
EXPORT_DATA_DIRECTORY = 'cleaned_files'
HTTP_RESPONSE_SUCCESS_CODE_PREFIX = '2'
MAX_RETRIES = 3
MONTHS_PER_QUARTER = 3
QUARTERS_PER_YEAR = 4
MONTHS_PER_YEAR = 12
RAW_DATA_DIRECTORY = 'raw_files'
ROOT_URL = "http://www.nber.org/databases/macrohistory/contents/chapter{0}.html"
def download_file(url, file_name, file_type):
file_path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.' + file_type)
if os.path.exists(file_path):
return None
response = None
for i in range(MAX_RETRIES):
try:
response = requests.get(url)
break
except:
sleep(1 + random())
if not str(response.status_code).startswith(HTTP_RESPONSE_SUCCESS_CODE_PREFIX) or not response:
print(f"Failed to download {url}. Last response code: {response.status_code}")
return None
with open(file_path, 'w+') as f_open:
f_open.write(response.text)
def period_to_month(period, observations_per_year):
if observations_per_year == QUARTERS_PER_YEAR:
return (period - 1) * MONTHS_PER_QUARTER + 1
return period
def db_period_to_timestamp(db_period, observations_per_year):
# returns beginning of period date
db_components = db_period.split('.')
year = int(db_components[0])
day = 1
if len(db_components) > 1 and db_components[1]:
month = period_to_month(int(db_components[1]), observations_per_year)
else:
month = 1
return pd.Timestamp(year, month, day)
def safe_float_conversion(num_string):
# there are some gibberish values (looks like file corruption)
# that must be dropped
try:
return float(num_string)
except:
return pd.np.nan
def microtsp_db_to_dataframe(file_name):
"""
Convert a micro tsp style .db file to a tidy pandas DataFrame
file format: https://en.wikipedia.org/wiki/Databank_format
example file: http://www.nber.org/databases/macrohistory/data/01/a01042a.db
"""
file_path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.db')
with open(file_path, encoding='iso-8859-1') as f_open:
db_lines = f_open.readlines()
db_lines = [x.strip() for x in db_lines if not x.startswith('"')]
db_lines = [x for x in db_lines if x != '']
try:
observations_per_year = db_lines.pop(0)
observations_per_year = abs(int(observations_per_year.strip('.')))
start_date = db_lines.pop(0)
start_date = db_period_to_timestamp(start_date, observations_per_year)
db_lines.pop(0)
except:
# some files are malformed, skip them rather than trying to fix.
return None
if len(db_lines) == 0:
return None
date_range = pd.date_range(start=start_date, periods=len(db_lines),
freq=f'{int(MONTHS_PER_YEAR/observations_per_year)}M')
df = pd.DataFrame([x for x in db_lines], index=date_range, columns=['Value'])
df.index.name = 'Date'
df.reset_index(inplace=True)
df['Variable'] = file_name
df['Value'] = df['Value'].apply(safe_float_conversion)
return df
def get_chapter_urls(chapter):
df = pd.read_html(ROOT_URL.format(chapter))[1]
df.columns = ['db', 'dat', 'doc', 'file_name', 'description']
df['data_url'] = df['file_name'].apply(lambda x: DATA_BASE_URL.format(chapter, x))
df['doc_url'] = df['file_name'].apply(lambda x: DOCS_BASE_URL.format(chapter, x))
return df[['file_name', 'data_url', 'doc_url', 'description']]
def download_chapter(chapter, file_descriptions):
chapter_metadata = get_chapter_urls(chapter)
chapter_metadata.apply(lambda row:
download_file(row['doc_url'], row['file_name'], 'txt'), axis=1)
chapter_metadata.apply(lambda row:
download_file(row['data_url'], row['file_name'], 'db'), axis=1)
if file_descriptions is None:
return chapter_metadata[['file_name', 'description']].copy()
else:
return file_descriptions.append(chapter_metadata[['file_name', 'description']])
def prepare_folders():
if not os.path.exists(RAW_DATA_DIRECTORY):
os.mkdir(RAW_DATA_DIRECTORY)
if not os.path.exists(EXPORT_DATA_DIRECTORY):
os.mkdir(EXPORT_DATA_DIRECTORY)
def download_all_chapters():
file_descriptions = None
for chapter in CHAPTERS:
file_descriptions = download_chapter(chapter, file_descriptions)
print(f'Chapter {chapter} download complete')
file_desc_path = os.path.join(EXPORT_DATA_DIRECTORY, "file_descriptions.csv")
file_descriptions.to_csv(file_desc_path, index=False)
def process_raw_db_files():
# aggregate .db files into a large csv, all docs into a csv
db_files = [x for x in os.listdir(RAW_DATA_DIRECTORY) if x.endswith('.db')]
db_files = [x.split('.')[0] for x in db_files]
db_files = [microtsp_db_to_dataframe(x) for x in db_files]
db_files = [x for x in db_files if x is not None]
df = pd.concat(db_files, ignore_index=True)
df.Date = df.Date.apply(lambda x: x.replace(day=1))
df.to_csv(os.path.join(EXPORT_DATA_DIRECTORY, 'data.csv'), index=False)
def load_doc_file(file_name):
path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.txt')
with open(path) as f_open:
doc = f_open.read()
return doc
def process_raw_doc_files():
# aggregate all docs into a csv
df = pd.read_csv(os.path.join(EXPORT_DATA_DIRECTORY, "file_descriptions.csv"))
# original documentation format has a lot of cruft to be trimmed
df['documentation'] = df['file_name'].apply(load_doc_file)
df['documentation'] = df['documentation'].str.replace('"c\s+', '')
df['documentation'] = df['documentation'].str.replace('\.{2,}', '')
df['documentation'] = df['documentation'].str.replace('\s*"\n', '\n')
df['chapter'] = df['file_name'].str.extract('(\d{2})\d+' , expand=False)
df.to_csv(os.path.join(EXPORT_DATA_DIRECTORY, "documentation.csv"), index=False)
def prepare_data(should_download_data):
prepare_folders()
if should_download_data:
download_all_chapters()
process_raw_db_files()
process_raw_doc_files()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--download-data", help="Download the data files?", action="store_true")
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
prepare_data(should_download_data=args.download_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment