Skip to content

Instantly share code, notes, and snippets.

@SohierDane
Created October 17, 2017 23:36
Show Gist options
  • Save SohierDane/2c1b36f653724fbc7d8f26501ef4b88d to your computer and use it in GitHub Desktop.
Save SohierDane/2c1b36f653724fbc7d8f26501ef4b88d to your computer and use it in GitHub Desktop.
Downloads, reformats, and cleans the US Census Bureau's Business & Industry reports.
"""
Download all files from
https://www.census.gov/econ/currentdata/datasets/index
and repackage into a data.csv, metadata.csv, and notes.txt
"""
import argparse
import os
import pandas as pd
import re
import requests
from io import BytesIO, StringIO
from random import random
from shutil import rmtree
from time import sleep
from zipfile import ZipFile
BASE_DOWNLOAD_URL = 'https://www.census.gov/econ/currentdata/datasets/{0}.zip'
CENSUS_SECTION_DELIMITER = '\n\n\n'
EXPORT_DATA_DIRECTORY = 'cleaned_files'
HTTP_RESPONSE_SUCCESS_CODE_PREFIX = '2'
MAX_RETRIES = 3
MONTHS_PER_QUARTER = 3
RAW_DATA_DIRECTORY = 'raw_files'
ROOT_URL = 'https://www.census.gov/econ/currentdata/datasets/index'
# units were copied from one of the readme files
UNITS = {
'%PTS': ' Percentage Points',
'BLN$': ' Billions of Dollars',
'CENTS': 'Cents',
'CP$': ' Cents per dollar',
'DOL': ' Dollars',
'K': ' Thousands of Units',
'K$': 'Thousands of Dollars',
'MLN$': ' Millions of Dollars',
'MO': 'Number of Months',
'PCT': ' Percent',
'RATIO': 'Ratio',
'UNITS': 'Units'
}
def download_failed(response):
return not str(response.status_code).startswith(HTTP_RESPONSE_SUCCESS_CODE_PREFIX)
def download_zip(file_name):
file_path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.csv')
if os.path.exists(file_path):
return None
url = BASE_DOWNLOAD_URL.format(file_name)
response = None
for i in range(MAX_RETRIES):
try:
response = requests.get(url)
break
except:
sleep(1 + random())
if not response or download_failed(response):
print(f"Failed to download {url}. Last response code: {response.status_code}")
return None
ZipFile(BytesIO(response.content)).extract(file_name + '.csv', RAW_DATA_DIRECTORY)
def prepare_folders(force_download):
# force downloads by wiping any existing raw data
if force_download and os.path.exists(RAW_DATA_DIRECTORY):
rmtree(RAW_DATA_DIRECTORY)
if not os.path.exists(RAW_DATA_DIRECTORY):
os.mkdir(RAW_DATA_DIRECTORY)
if not os.path.exists(EXPORT_DATA_DIRECTORY):
os.mkdir(EXPORT_DATA_DIRECTORY)
def download_all_zips():
website = pd.read_html(ROOT_URL)[0]
website['File'] = website['File'].apply(lambda x: re.sub('.zip', '', x))
website['File'].apply(download_zip)
website.rename(columns={'Report/Survey': 'report'}, inplace=True)
website.to_csv(os.path.join(RAW_DATA_DIRECTORY, 'metadata.csv'), index=False)
def period_to_BOP_date(period):
"""
Convert the period to a beginning of period date.
BOP dates used to ensure all time series can be aligned, regardless of frequency.
Pandas can natively handle the monthly and annual strings, but quarters
look like 'Q22009' and require special handling
"""
is_quarter = bool(re.search('^Q\d{5}$', period))
if is_quarter:
quarter = int(period[1])
month = str(((quarter - 1) * MONTHS_PER_QUARTER) + 1)
year = period[2:]
period = month + '-' + year
return pd.to_datetime(period)
def repackage_file(file_name):
with open(os.path.join(RAW_DATA_DIRECTORY, file_name)) as f_open:
raw_text = f_open.read()
# the notes sections are badly formatted and often contain the \n\n\n pattern internally
notes_pattern = '(?s)NOTES\n.+(?=\n\n\nDATA)'
notes = '\n\n' + file_name + ':\n' + re.search(notes_pattern, raw_text)[0]
sections = re.sub(notes_pattern, '', raw_text).split(CENSUS_SECTION_DELIMITER)
dataframes = []
for section in sections:
try:
df = pd.read_csv(StringIO(section), skiprows=1, dtype=object)
if not df.empty:
dataframes.append(df)
except:
continue
core_df = dataframes.pop(-1)
dataframes = {x.columns[0]: x for x in dataframes}
for idx_column, dataframe in dataframes.items():
dataframe.set_index(idx_column, inplace=True)
dataframes['per_idx']['bop_date'] = dataframes['per_idx']['per_name'].apply(period_to_BOP_date)
core_df['date'] = core_df['per_idx'].apply(lambda x: dataframe['bop_date'].loc[x])
del dataframes['per_idx']
for idx_column, dataframe in dataframes.items():
core_df = core_df.merge(dataframe, how='left', left_on=idx_column, right_index=True)
core_df = core_df[[col for col in core_df.columns if not col.endswith('_idx')]]
return core_df, notes
def repackage_raw_files():
metadata = pd.read_csv(os.path.join(RAW_DATA_DIRECTORY, 'metadata.csv'))
metadata.set_index('File', inplace=True)
notes = ''
dfs = []
files_to_process = [x for x in os.listdir(RAW_DATA_DIRECTORY)
if x.endswith('csv') and not x.startswith('metadata')]
for raw_file in files_to_process:
df, note = repackage_file(raw_file)
notes += note
df['report'] = metadata['report'].loc[re.sub('.csv', '', raw_file)]
dfs.append(df)
with open(os.path.join(EXPORT_DATA_DIRECTORY, 'notes.txt'), 'w+') as f_open:
f_open.write(notes)
return pd.concat(dfs)
def clean_and_export_dataset(df):
df.rename(columns={'val': 'value', 'cat_indent': 'category_level'}, inplace=True)
for column in df.select_dtypes(include=['object']).columns:
df[column] = df[column].str.strip()
df['adj'] = df['is_adj'].apply(lambda x: '_adj' if x == '1' else '')
df['detail_code'] = df['dt_code'].fillna(value='') + df['et_code'].fillna(value='')
df['time_series_code'] = (df['cat_code'] + '_' + df['detail_code'] + '_' +
df['geo_code'] + df['adj'])
del df['adj']
# remapping error code '(z)' to 'Less than .05 percent' based on one of the
# notes sections. Only applies to 2 time series.
df['value'] = df['value'].str.replace('\(z\)', 'Less than .05 percent')
data_columns = ['time_series_code', 'date', 'value']
df[data_columns].to_csv(os.path.join(EXPORT_DATA_DIRECTORY, 'data.csv'), index=False)
df = df[[col for col in df.columns if col not in ['date', 'value']]]
df['dt_unit'] = df['dt_unit'].apply(lambda x: UNITS[x] if x in UNITS else '')
df['et_unit'] = df['et_unit'].apply(lambda x: UNITS[x] if x in UNITS else '')
df.drop_duplicates(keep='first').to_csv(
os.path.join(EXPORT_DATA_DIRECTORY, 'metadata.csv'), index=False)
def prepare_data(force_download):
prepare_folders(force_download)
download_all_zips()
df = repackage_raw_files()
clean_and_export_dataset(df)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--force-download",
help="Wipe and re-download the data files?",
action="store_true"
)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
prepare_data(force_download=args.force_download)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment