Skip to content

Instantly share code, notes, and snippets.

@erikbern
Last active May 1, 2022 18:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erikbern/89c5f44bd1354854a8954fa2df04453d to your computer and use it in GitHub Desktop.
Save erikbern/89c5f44bd1354854a8954fa2df04453d to your computer and use it in GitHub Desktop.
Download and parse American Community Survey data using Python
# Uses American Community Survey data to estimate property taxes
# https://www.census.gov/programs-surveys/acs/
# The data is a f-ing PITA to parse, but here's an attempt
import bs4
import csv
import io
import os
import requests
import sys
import urllib.request
import zipfile
def download(src, dst):
if not os.path.exists(dst):
# TODO: should be atomic
print('downloading %s -> %s...' % (src, dst), file=sys.stderr)
urllib.request.urlretrieve(src, dst)
class ACS:
def __init__(self, lookup_url, lookup_fn, data_url, data_dir):
download(lookup_url, lookup_fn)
self._lookup_fn = lookup_fn
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# Go to the "data by state" page and scan the HTML page for links to zip files
soup = bs4.BeautifulSoup(requests.get(data_url).content)
self._data_zips = []
for link in soup.find_all('a'):
if link.get('href') and link.get('href').endswith('zip'):
fn = link.get('href').split('/')[-1]
download(data_url + '/' + fn, os.path.join(data_dir, fn))
self._data_zips.append(zipfile.ZipFile(os.path.join(data_dir, fn), 'r'))
def find_table(self, table_title, subject_area):
with open(self._lookup_fn, 'r', encoding='iso-8859-1') as csvfile:
reader = csv.DictReader(csvfile, dialect='unix')
seq_number, start_pos, cells = None, None, []
current_table_title = None
for row in reader:
if row['Table Title'] and row['Total Cells in Table']:
current_table_title = row['Table Title']
if current_table_title == table_title and row['Start Position']:
seq_number = int(row['Sequence Number'])
start_pos = int(row['Start Position'])
if current_table_title == table_title and row['Line Number']:
try:
int(row['Line Number'])
cells.append(row['Table Title'])
except:
pass
return seq_number, start_pos, cells
def get_geos(self):
geos = {}
for data_zip in self._data_zips:
for info in data_zip.infolist():
if info.filename.startswith('g') and info.filename.endswith('.csv'):
with data_zip.open(info.filename) as csvfile:
print('Parsing geography data for', info.filename, file=sys.stderr)
data = csvfile.read()
buf = io.StringIO(data.decode('iso-8859-1'))
reader = csv.reader(buf, dialect='unix')
for row in reader:
geos[(row[1], row[4])] = row[-4]
return geos
def get_table(self, table_title, subject_area):
seq_number, start_pos, cells = self.find_table(table_title, subject_area)
ret = {}
for data_zip in self._data_zips:
for info in data_zip.infolist():
if info.filename.startswith('e') and info.filename.endswith('%04d000.txt' % seq_number):
with data_zip.open(info.filename) as csvfile:
print('Parsing data for', info.filename, file=sys.stderr)
data = csvfile.read()
buf = io.StringIO(data.decode('iso-8859-1'))
reader = csv.reader(buf, dialect='unix')
col_i, col_j = start_pos-1, start_pos+len(cells)-1
for row in reader:
state = row[2].upper()
logical_record_number = row[5]
values = [int(value) if (value and value != '.' and int(value) > 0) else None for value in row[col_i:col_j]]
ret[(state, logical_record_number)] = {k: v for k, v in zip(cells, values) if v is not None}
return ret
class OneYearACS(ACS):
def __init__(self):
super(OneYearACS, self).__init__(
'https://www2.census.gov/programs-surveys/acs/summary_file/2016/documentation/user_tools/ACS_1yr_Seq_Table_Number_Lookup.txt',
'acs_lookup.txt',
'https://www2.census.gov/programs-surveys/acs/summary_file/2016/data/1_year_by_state',
'1_year_data')
class FiveYearACS(ACS):
def __init__(self):
super(FiveYearACS, self).__init__(
'https://www2.census.gov/programs-surveys/acs/summary_file/2016/documentation/user_tools/ACS_5yr_Seq_Table_Number_Lookup.txt',
'5y_lookup.txt',
'https://www2.census.gov/programs-surveys/acs/summary_file/2016/data/5_year_by_state',
'5_year_data')
if __name__ == '__main__':
# See the summary files for a list of all the datasets that are available
for acs in [OneYearACS(), FiveYearACS()]:
mortgage_data = acs.get_table('MORTGAGE STATUS', 'Housing')
value_data = acs.get_table('MEDIAN VALUE (DOLLARS)', 'Housing')
tax_data = acs.get_table('MORTGAGE STATUS BY MEDIAN REAL ESTATE TAXES PAID (DOLLARS)', 'Housing')
geos = acs.get_geos()
for k, name in geos.items():
tax = tax_data.get(k, {}).get('Median real estate taxes paid for units with a mortgage')
value = value_data.get(k, {}).get('Median value (dollars)')
mortgage = mortgage_data.get(k, {}).get('Housing units with a mortgage, contract to purchase, or similar debt:')
if tax and value and mortgage:
print('%6.2f%% %6d %s' % (100. * tax / value, mortgage, name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment