Last active
May 17, 2020 00:51
-
-
Save dwreeves/ecd7b1e278241df3219d5243d0c6117e to your computer and use it in GitHub Desktop.
Module for `import_census_tract_data`, a function that allows for easy import of Census data into a Pandas DataFrame format.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Definitions of various constants. Adjust based on project structure and needs. | |
""" | |
import os | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Files and Directories | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# dirs | |
# ---- | |
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) | |
DATA_DIR = os.path.join(ROOT_DIR, 'data') | |
CENSUS_CACHE_DIR = os.path.join(DATA_DIR, 'census_cache') | |
# other files | |
# ----------- | |
CENSUS_KEY_FILE = os.path.join(ROOT_DIR, 'census_key.txt') | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Data definitions | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
CENSUS_SERIES = { | |
'B19013_001E': 'MedianHHIncome', | |
'B19083_001E': 'Gini', | |
'B01002_001E': 'MedianAge', | |
'B01003_001E': 'Pop', | |
'B02001_002E': 'WhitePop', | |
'B02001_003E': 'BlackPop', | |
'B03001_001E': 'HispPop', | |
'B05002_013E': 'ForeignPop', | |
'B25007_001E': 'Units', | |
'B25007_002E': 'OwnerOccupiedUnits', | |
'B25007_012E': 'RenterOccupiedUnits', | |
'B15003_001E': 'TotalEduc', | |
'B15003_022E': 'BachelorsEduc', | |
'B15003_023E': 'MastersEduc', | |
'B15003_024E': 'ProfessionalEduc', | |
'B15003_025E': 'DoctorateEduc' | |
} | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Options | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
DEFAULT_YEAR = 2015 | |
USE_CACHE_DEFAULT = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This file contains helper functions for importing Census data. | |
""" | |
import os | |
import ast | |
import json | |
import collections | |
import us | |
import numpy as np | |
import pandas as pd | |
from census import Census | |
import us | |
from .definitions import ROOT_DIR, DEFAULT_YEAR, CENSUS_CACHE_DIR, CENSUS_KEY_FILE, \ | |
CENSUS_SERIES, USE_CACHE_DEFAULT | |
CACHE_DIR = CENSUS_CACHE_DIR | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Connect to Census API | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
try: | |
_ckey = open(CENSUS_KEY_FILE).readline().rstrip() | |
_c = Census(_ckey) | |
except Exception as e: | |
import warnings | |
warnings.warn(e) | |
_c = None | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
def _cache_file(i, dir_path=CACHE_DIR) | |
return os.path.join(dir_path, f'{str(i).zfill(7)}.json') | |
def reorder_columns(df, start_cols=[], end_cols=[]): | |
""" | |
Reorder columns in a pd.DataFrame so that some are moved to the front or the back. | |
""" | |
reorder = start_cols + [c for c in df.columns if c not in start_cols + end_cols] + end_cols | |
return df[reorder] | |
def cache_request(func): | |
""" | |
Wrapper that caches all Census requests. | |
""" | |
def _wrapped_func(*args, **kwargs): | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Check to see if we should be using cache | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
if not kwargs.get('use_cache', CACHE_DIR): | |
return func(*args, **kwargs) | |
cache_dir = kwargs.get('cache_dir', CACHE_DIR) | |
# ~~~~~~~~~~~~~ | |
# Load up cache | |
# ~~~~~~~~~~~~~ | |
cache_index_file = os.path.join(cache_dir, '_.txt') | |
if not os.path.exists(cache_index_file): | |
open(cache_index_file, 'w').close() | |
check_args = args[1:] # Exclude census.Census obj | |
check_args += (func.__name__,) | |
check_kwargs = kwargs | |
check_kwargs.pop('use_cache', None) | |
check_kwargs.pop('cache_dir', None) | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Check cache for previous request; if exists, use it | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
i = -1 | |
with open(cache_index_file, 'r') as cache_index: | |
for i, row in enumerate(cache_index): | |
_args, _kwargs = ast.literal_eval(row.strip()) | |
# If request was already done, load it | |
if check_args == _args and check_kwargs == _kwargs: | |
return json.load(open(_cache_file(i, cache_dir))) | |
## If we got here, then request is new / does not exist in index file! | |
new_cached_json_file = _cache_file(i, cache_dir) | |
# ~~~~~~~~~~~~~~~~~~~~~ | |
# Save request to cache | |
# ~~~~~~~~~~~~~~~~~~~~~ | |
res = func(*args, **kwargs) | |
with open(cache_index_file, 'a') as cache_index: | |
cache_index.write(str((check_args, check_kwargs))) | |
cache_index.write('\n') | |
with open(new_cached_json_file, 'w+') as new_cached_json: | |
new_cached_json.write(json.dumps(res)) | |
return res | |
return _wrapped_func | |
# Dev status: almost complete; work on processing. | |
def import_census_tract_data(c=_c, census_series=CENSUS_SERIES, year=DEFAULT_YEAR, state=None, | |
raw=False, shape='wide', use_cache=False, **kwargs) -> pd.DataFrame: | |
""" | |
Returns processed Census data. Processing kwargs are passed through **kwargs. | |
:param c: (census.Census) Census API wrapper object. | |
:param census_series: (dict or list) If dict, keys are the names of Census series names and | |
values rename the series. If list, then just names of Census series. | |
:param year: (int or iterable) The year(s) to pull Census data for. | |
:param state: (str) The state for which you want to get tract data. Takes state abbreviation, | |
state name, or FIPS (as 2-digit string) as valid inputs. If None, returns data | |
for all states. | |
:param shape: (str) 'wide' or 'long'. Only matters if year is an iterable. If wide, years are | |
appended to column names and each row is a single tract. If long, creates a | |
column for year, an each row is a single tract + year combination. | |
:param raw: (bool) If False, does not process data; if True, processes data. | |
:param use_cache: (bool) If True, uses caching to limit the amount of requests to the API. | |
:returns: (pd.DataFrame) | |
""" | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Logic for handling multi-year inputs | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
if isinstance(year, collections.Iterable) and not isinstance(year, str): | |
base_cols = ['tract', 'county', 'state'] + (['year'] if shape == 'long' else []) | |
df = pd.DataFrame([], columns=base_cols) | |
for yr in year: | |
_df = import_census_tract_data(c=c, census_series=census_series, year=yr, state=state, | |
raw=raw, use_cache=use_cache, **kwargs) | |
if shape == 'wide': | |
_df.columns = [ | |
(''.join([c, str(yr)]) if c not in base_cols else c) | |
for c | |
in _df.columns | |
] | |
df = df.merge(right=_df, on=base_cols, how='outer') | |
elif shape == 'long': | |
df = df.append(_df, sort=False) | |
df.loc[df['year'].isna(), 'year'] = yr | |
df = reorder_columns(df, start_cols=base_cols) | |
return df | |
# ~~~~~~~~ | |
# Get data | |
# ~~~~~~~~ | |
kwargs_import = dict(c=c, year=int(year), census_series=census_series) | |
if state is not None: | |
df = _get_state_level_tract_data(**kwargs_import, state=state) | |
else: | |
df = _get_tract_data_for_all_states(**kwargs_import) | |
# recast tract as int64 | |
df['tract'] = df['tract'].astype(np.int64) | |
# ~~~~~~~~~~~~~~~~~~~~~~ | |
# Process or return data | |
# ~~~~~~~~~~~~~~~~~~~~~~ | |
if raw: | |
return df | |
else: | |
return process_census_tract_data(df, **kwargs) | |
def process_census_tract_data(df, **kwargs): | |
""" | |
Takes an unprocessed dataframe and returns a processed dataframe. See the code for how the data | |
is processed. | |
""" | |
# Scale Population and Household Unit vars to be percentages instead of absolute amounts. | |
for c in df.columns: | |
if c.find('Pop') >= 0: | |
df[c] /= df['Pop'] | |
for c in df.columns: | |
if c.find('Units') >= 0: | |
df[c] /= df['Units'] | |
# Education related statistics. | |
try: | |
postgrad_educ = ['MastersEduc', 'ProfessionalEduc', 'DoctorateEduc'] | |
df['PostgradPercent'] = df[postgrad_educ].sum(axis=1) / df['TotalEduc'] | |
df['CollegePercent'] = df[postgrad_educ + ['BachelorsEduc']].sum(axis=1) / df['TotalEduc'] | |
except KeyError as e: | |
pass | |
# Reorder columns | |
df = reorder_columns(df, start_cols=['tract', 'county', 'state']) | |
return df | |
@cache_request | |
def request_census_tract_data(c, *args, use_cache=USE_CACHE_DEFAULT, **kwargs): | |
""" | |
Returns the raw request from the Census tract-level data. This function using caching to reduce | |
the number of times the Census API needs to be accessed. The cache can be turned off with the | |
`use_cache` kwarg. All other kwargs are passed into the `c.acs5.state_county_tract` method. | |
""" | |
return c.acs5.state_county_tract(*args, **kwargs) | |
def _get_tract_data_for_all_states(c, year=None, census_series=CENSUS_SERIES) -> pd.DataFrame: | |
""" | |
This function runs the function `get_state_tract_data` but for all states. Each state's tract- | |
level data is pulled in individual requests. The data returned is unprocessed. | |
:param c: (census.Census) Census API wrapper object. | |
:param year: (int) The year to pull Census data for. | |
:param census_series: (dict or list) If dict, keys are the names of Census series names and | |
values rename the series. If list, then just names of Census series. | |
:param record_year_as: (str or None) 'suffix', 'column', or None. This records the year either | |
as a suffix in column names (good for wide shaped data), as its own | |
column (good for long shaped data), or None (good for a single year). | |
""" | |
return ( | |
pd.concat([_get_state_level_tract_data(c, | |
census_series=census_series, | |
state=state.fips, | |
year=year) | |
for state | |
in us.states.STATES], | |
axis=0, | |
ignore_index=True) | |
.reset_index(drop=True) | |
) | |
def _get_state_level_tract_data(c, state=None, census_series=CENSUS_SERIES, | |
year=None, use_cache=USE_CACHE_DEFAULT) -> pd.DataFrame: | |
""" | |
Takes a Census API wrapper object and a census_series and returns an unprocessed DataFrame of | |
single state-level Census data. | |
:param c: (census.Census) Census API wrapper object. | |
:param state: (str) The state for which you want to get tract data. Takes state abbreviation, | |
state name, or FIPS (as 2-digit string) as valid inputs. | |
:param census_series: (dict or list) If dict, keys are the names of Census series names and | |
values rename the series. If list, then just names of Census series. | |
:param year: (int) The year to pull Census data for. | |
:param use_cache: (bool) If True, uses caching to limit the amount of requests to the API. | |
:returns: (pd.DataFrame) | |
""" | |
args = [ | |
# field | |
list(census_series), | |
# state_fips | |
us.states.lookup(state).fips, | |
# county_fips | |
Census.ALL, | |
# tract | |
Census.ALL | |
] | |
req = request_census_tract_data(c, *args, year=year, use_cache=use_cache) | |
df = pd.DataFrame(req) | |
if isinstance(census_series, dict): | |
df = df.rename(columns=census_series) | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment