Skip to content

Instantly share code, notes, and snippets.

@dwreeves
Last active May 17, 2020 00:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dwreeves/ecd7b1e278241df3219d5243d0c6117e to your computer and use it in GitHub Desktop.
Save dwreeves/ecd7b1e278241df3219d5243d0c6117e to your computer and use it in GitHub Desktop.
Module for `import_census_tract_data`, a function that allows for easy import of Census data into a Pandas DataFrame format.
"""
Definitions of various constants. Adjust based on project structure and needs.
"""
import os
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Files and Directories
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# dirs
# ----
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
DATA_DIR = os.path.join(ROOT_DIR, 'data')
CENSUS_CACHE_DIR = os.path.join(DATA_DIR, 'census_cache')
# other files
# -----------
CENSUS_KEY_FILE = os.path.join(ROOT_DIR, 'census_key.txt')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Data definitions
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
CENSUS_SERIES = {
'B19013_001E': 'MedianHHIncome',
'B19083_001E': 'Gini',
'B01002_001E': 'MedianAge',
'B01003_001E': 'Pop',
'B02001_002E': 'WhitePop',
'B02001_003E': 'BlackPop',
'B03001_001E': 'HispPop',
'B05002_013E': 'ForeignPop',
'B25007_001E': 'Units',
'B25007_002E': 'OwnerOccupiedUnits',
'B25007_012E': 'RenterOccupiedUnits',
'B15003_001E': 'TotalEduc',
'B15003_022E': 'BachelorsEduc',
'B15003_023E': 'MastersEduc',
'B15003_024E': 'ProfessionalEduc',
'B15003_025E': 'DoctorateEduc'
}
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Options
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DEFAULT_YEAR = 2015
USE_CACHE_DEFAULT = True
"""
This file contains helper functions for importing Census data.
"""
import os
import ast
import json
import collections
import us
import numpy as np
import pandas as pd
from census import Census
import us
from .definitions import ROOT_DIR, DEFAULT_YEAR, CENSUS_CACHE_DIR, CENSUS_KEY_FILE, \
CENSUS_SERIES, USE_CACHE_DEFAULT
CACHE_DIR = CENSUS_CACHE_DIR
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Connect to Census API
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
try:
_ckey = open(CENSUS_KEY_FILE).readline().rstrip()
_c = Census(_ckey)
except Exception as e:
import warnings
warnings.warn(e)
_c = None
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _cache_file(i, dir_path=CACHE_DIR)
return os.path.join(dir_path, f'{str(i).zfill(7)}.json')
def reorder_columns(df, start_cols=[], end_cols=[]):
"""
Reorder columns in a pd.DataFrame so that some are moved to the front or the back.
"""
reorder = start_cols + [c for c in df.columns if c not in start_cols + end_cols] + end_cols
return df[reorder]
def cache_request(func):
"""
Wrapper that caches all Census requests.
"""
def _wrapped_func(*args, **kwargs):
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check to see if we should be using cache
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if not kwargs.get('use_cache', CACHE_DIR):
return func(*args, **kwargs)
cache_dir = kwargs.get('cache_dir', CACHE_DIR)
# ~~~~~~~~~~~~~
# Load up cache
# ~~~~~~~~~~~~~
cache_index_file = os.path.join(cache_dir, '_.txt')
if not os.path.exists(cache_index_file):
open(cache_index_file, 'w').close()
check_args = args[1:] # Exclude census.Census obj
check_args += (func.__name__,)
check_kwargs = kwargs
check_kwargs.pop('use_cache', None)
check_kwargs.pop('cache_dir', None)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check cache for previous request; if exists, use it
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
i = -1
with open(cache_index_file, 'r') as cache_index:
for i, row in enumerate(cache_index):
_args, _kwargs = ast.literal_eval(row.strip())
# If request was already done, load it
if check_args == _args and check_kwargs == _kwargs:
return json.load(open(_cache_file(i, cache_dir)))
## If we got here, then request is new / does not exist in index file!
new_cached_json_file = _cache_file(i, cache_dir)
# ~~~~~~~~~~~~~~~~~~~~~
# Save request to cache
# ~~~~~~~~~~~~~~~~~~~~~
res = func(*args, **kwargs)
with open(cache_index_file, 'a') as cache_index:
cache_index.write(str((check_args, check_kwargs)))
cache_index.write('\n')
with open(new_cached_json_file, 'w+') as new_cached_json:
new_cached_json.write(json.dumps(res))
return res
return _wrapped_func
# Dev status: almost complete; work on processing.
def import_census_tract_data(c=_c, census_series=CENSUS_SERIES, year=DEFAULT_YEAR, state=None,
raw=False, shape='wide', use_cache=False, **kwargs) -> pd.DataFrame:
"""
Returns processed Census data. Processing kwargs are passed through **kwargs.
:param c: (census.Census) Census API wrapper object.
:param census_series: (dict or list) If dict, keys are the names of Census series names and
values rename the series. If list, then just names of Census series.
:param year: (int or iterable) The year(s) to pull Census data for.
:param state: (str) The state for which you want to get tract data. Takes state abbreviation,
state name, or FIPS (as 2-digit string) as valid inputs. If None, returns data
for all states.
:param shape: (str) 'wide' or 'long'. Only matters if year is an iterable. If wide, years are
appended to column names and each row is a single tract. If long, creates a
column for year, an each row is a single tract + year combination.
:param raw: (bool) If False, does not process data; if True, processes data.
:param use_cache: (bool) If True, uses caching to limit the amount of requests to the API.
:returns: (pd.DataFrame)
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Logic for handling multi-year inputs
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if isinstance(year, collections.Iterable) and not isinstance(year, str):
base_cols = ['tract', 'county', 'state'] + (['year'] if shape == 'long' else [])
df = pd.DataFrame([], columns=base_cols)
for yr in year:
_df = import_census_tract_data(c=c, census_series=census_series, year=yr, state=state,
raw=raw, use_cache=use_cache, **kwargs)
if shape == 'wide':
_df.columns = [
(''.join([c, str(yr)]) if c not in base_cols else c)
for c
in _df.columns
]
df = df.merge(right=_df, on=base_cols, how='outer')
elif shape == 'long':
df = df.append(_df, sort=False)
df.loc[df['year'].isna(), 'year'] = yr
df = reorder_columns(df, start_cols=base_cols)
return df
# ~~~~~~~~
# Get data
# ~~~~~~~~
kwargs_import = dict(c=c, year=int(year), census_series=census_series)
if state is not None:
df = _get_state_level_tract_data(**kwargs_import, state=state)
else:
df = _get_tract_data_for_all_states(**kwargs_import)
# recast tract as int64
df['tract'] = df['tract'].astype(np.int64)
# ~~~~~~~~~~~~~~~~~~~~~~
# Process or return data
# ~~~~~~~~~~~~~~~~~~~~~~
if raw:
return df
else:
return process_census_tract_data(df, **kwargs)
def process_census_tract_data(df, **kwargs):
"""
Takes an unprocessed dataframe and returns a processed dataframe. See the code for how the data
is processed.
"""
# Scale Population and Household Unit vars to be percentages instead of absolute amounts.
for c in df.columns:
if c.find('Pop') >= 0:
df[c] /= df['Pop']
for c in df.columns:
if c.find('Units') >= 0:
df[c] /= df['Units']
# Education related statistics.
try:
postgrad_educ = ['MastersEduc', 'ProfessionalEduc', 'DoctorateEduc']
df['PostgradPercent'] = df[postgrad_educ].sum(axis=1) / df['TotalEduc']
df['CollegePercent'] = df[postgrad_educ + ['BachelorsEduc']].sum(axis=1) / df['TotalEduc']
except KeyError as e:
pass
# Reorder columns
df = reorder_columns(df, start_cols=['tract', 'county', 'state'])
return df
@cache_request
def request_census_tract_data(c, *args, use_cache=USE_CACHE_DEFAULT, **kwargs):
"""
Returns the raw request from the Census tract-level data. This function using caching to reduce
the number of times the Census API needs to be accessed. The cache can be turned off with the
`use_cache` kwarg. All other kwargs are passed into the `c.acs5.state_county_tract` method.
"""
return c.acs5.state_county_tract(*args, **kwargs)
def _get_tract_data_for_all_states(c, year=None, census_series=CENSUS_SERIES) -> pd.DataFrame:
"""
This function runs the function `get_state_tract_data` but for all states. Each state's tract-
level data is pulled in individual requests. The data returned is unprocessed.
:param c: (census.Census) Census API wrapper object.
:param year: (int) The year to pull Census data for.
:param census_series: (dict or list) If dict, keys are the names of Census series names and
values rename the series. If list, then just names of Census series.
:param record_year_as: (str or None) 'suffix', 'column', or None. This records the year either
as a suffix in column names (good for wide shaped data), as its own
column (good for long shaped data), or None (good for a single year).
"""
return (
pd.concat([_get_state_level_tract_data(c,
census_series=census_series,
state=state.fips,
year=year)
for state
in us.states.STATES],
axis=0,
ignore_index=True)
.reset_index(drop=True)
)
def _get_state_level_tract_data(c, state=None, census_series=CENSUS_SERIES,
year=None, use_cache=USE_CACHE_DEFAULT) -> pd.DataFrame:
"""
Takes a Census API wrapper object and a census_series and returns an unprocessed DataFrame of
single state-level Census data.
:param c: (census.Census) Census API wrapper object.
:param state: (str) The state for which you want to get tract data. Takes state abbreviation,
state name, or FIPS (as 2-digit string) as valid inputs.
:param census_series: (dict or list) If dict, keys are the names of Census series names and
values rename the series. If list, then just names of Census series.
:param year: (int) The year to pull Census data for.
:param use_cache: (bool) If True, uses caching to limit the amount of requests to the API.
:returns: (pd.DataFrame)
"""
args = [
# field
list(census_series),
# state_fips
us.states.lookup(state).fips,
# county_fips
Census.ALL,
# tract
Census.ALL
]
req = request_census_tract_data(c, *args, year=year, use_cache=use_cache)
df = pd.DataFrame(req)
if isinstance(census_series, dict):
df = df.rename(columns=census_series)
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment