Skip to content

Instantly share code, notes, and snippets.

@SohierDane
Created August 3, 2017 18:47
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save SohierDane/0f2cf7a8538ca35431dd7575ac38e7ca to your computer and use it in GitHub Desktop.
Save SohierDane/0f2cf7a8538ca35431dd7575ac38e7ca to your computer and use it in GitHub Desktop.
CDC Mortality Dataset Preparation 2005-2015
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
For each year, parse the pdf manual, then use that information to
unpack the fixed-width data file.
Source data files can be found here:
https://www.cdc.gov/nchs/data_access/vitalstatsonline.htm#Mortality_Multiple
Passes basic tests for 2005-2015. Untested on earlier years.
"""
import json
import os
import pandas as pd
import pdb
import re
import tabula # https://github.com/chezou/tabula-py, requires JDK8 install
from collections import defaultdict, OrderedDict
from slugify import slugify
from string import digits, whitespace
YEARS_TO_PROCESS = [i for i in range(2005, 2016)[::-1]]
INPUT_DATA_DIR = "raw_data"
EXPORT_DATA_DIR = "cleaned_data"
FIELDS_TO_DROP = {'detail_age'}
FIELD_MAP_PATTERNS_TO_DROP = [r'(?i)(_|\s)condition']
MANUALS_DIR = "file_definitions"
GIBBERISH_SEPARATOR = '>!^@^!<' # should not match anything
UNCODED_FIELDS_TO_KEEP = {r'(?i)ICD_Code.*'}
LAST_PII_YEAR = 2004
ICD_VERSION = {year: 10 for year in range(1999, 2016)}
ICD_VERSION.update({year: 9 for year in range(1979, 1999)})
YEARS_MISSING_ICD_GROUPING = {2003}
DATA_PATHS = {
2005: 'Mort05uspb.dat',
2006: 'MORT06.DUSMCPUB',
2007: 'VS07MORT.DUSMCPUB',
2008: 'Mort2008us.dat',
2009: 'VS09MORT.DUSMCPUB',
2010: 'VS10MORT.DUSMCPUB',
2011: 'VS11MORT.DUSMCPUB',
2012: 'VS12MORT.DUSMCPUB',
2013: 'VS13MORT.DUSMCPUB',
2014: 'VS14MORT.DUSMCPUB',
2015: 'VS15MORT.DUSMCPUB',
}
PDF_PATHS = {
2005: 'Record_Layout_2005.pdf',
2006: 'Record_Layout_2006.pdf',
2007: 'Record_Layout_2007.pdf',
2008: 'Record_Layout_2008.pdf',
2009: 'Record_Layout_2009.pdf',
2010: 'Record_Layout_2010.pdf',
2011: 'Record_Layout_2011.pdf',
2012: 'Record_Layout_2012.pdf',
2013: 'Record_Layout_2013.pdf',
2014: 'Record_Layout_2014.pdf',
2015: 'multiple_cause_record_layout_2015.pdf',
}
# Detail_Age_Type, Detail_Age are malformed, require special handling
HARDCODED_CODES = {x:
{'detail_age_type': {1: 'Years', 2: 'Months', 4: 'Days', 5: 'Hours', 6: 'Minutes', 9: pd.np.nan}}
for x in range(2005, 2016)}
HARDCODED_LOCATIONS = {x: {'detail_age_type': (69, 70), 'detail_age': (70, 73)}
for x in range(2005, 2016)}
def extract_pdf_section(path, sep=',', pages=None):
# May log several warnings that can be ignored like:
# `org.apache.pdfbox.util.PDFStreamEngine processOperator INFO: unsupported/disabled operation`.
full_path = os.path.join(MANUALS_DIR, path)
try:
return tabula.read_pdf(full_path, guess=False, pages=pages,
pandas_options={'dtype': object, 'header': None, 'sep': sep, 'engine': 'python'})
except pd.errors.ParserError:
pdb.set_trace()
def extract_separate_pdf_pages(path):
# distinct function as passing any sep at all causes an error with multiple_tables
full_path = os.path.join(MANUALS_DIR, path)
try:
return tabula.read_pdf(full_path, multiple_tables=True, guess=False, pages='all',
pandas_options={'dtype': object, 'header': None})
except pd.errors.ParserError:
pdb.set_trace()
def is_column_definition_page(page_data, year):
# can't just pop the first column definition page; earlier years
# do actually have different format that won't generate false positive
clean_page_data = page_data.dropna(how='all').copy()
cleaned_first_line = str(clean_page_data.iloc[0, -1]).strip()
starts_with_year = cleaned_first_line.endswith(str(year))
cleaned_second_line = str(clean_page_data.iloc[1, -1]).strip()
# spacing is one of the main things tabula fails on
# have to assume zero or multiple spaces where one normally exists
follows_with_title = bool(safe_re_search(r'(?i)Mortality\s*Multiple\s*Cause-of-Death\s*Public\s*Use\s*Record$',
cleaned_second_line))
cleaned_third_line = str(clean_page_data.iloc[2, -1]).strip()
is_summary_page = bool(safe_re_search(r'(?i)List\s*of\s*File\s*Data\s*Elements\s*and\s*Tape\s*Locations',
cleaned_third_line))
return starts_with_year and follows_with_title and not is_summary_page
def is_icd_ten_group_page(page_data):
# matches text like 'Tenth Revision 130 Selected Causes of Infant Death Adapted for use by DVS'
cleaned_first_line = str(page_data.iloc[0, 0]).strip()
return bool(safe_re_search(r'(?i)Tenth\s*Revision\s*\d{2,3}\s*Selected\s*Causes\s*of\s*(\w+\s*)?Death', cleaned_first_line))
def identify_section_pages(path, year):
# get page numbers for each section.
# need 2 passes as multiple_tables does not support the 'sep' option
column_definition_pages = []
icd_group_pages = []
pages = extract_separate_pdf_pages(path)
have_seen_icd_pages = False
for page_num, page_data in enumerate(pages):
if is_column_definition_page(page_data, year):
column_definition_pages.append(page_num + 1)
elif is_icd_ten_group_page(page_data):
icd_group_pages.append(page_num + 1)
have_seen_icd_pages = True
elif have_seen_icd_pages:
# abort after end of icd to avoid pulling territorial column_definition_pages
break
return column_definition_pages, icd_group_pages
def extract_pdf(year):
pdf_path = PDF_PATHS[year]
column_definition_pages, icd_group_pages = identify_section_pages(pdf_path, year)
results = [extract_pdf_section(pdf_path, pages=column_definition_pages)]
# gibberish separator ensures a section is read in as 1 column per page
results.append(extract_pdf_section(pdf_path, pages=icd_group_pages, sep=GIBBERISH_SEPARATOR))
return results
def safe_re_search(pattern, text):
if pd.isnull(text):
return None
return re.search(pattern, text)
def is_column_location_row(location_text):
return bool(safe_re_search(r'^\d{1,5}(-\d{1,5})?', location_text))
def is_encoding_row(code_text):
return bool(safe_re_search(r'\.{3}.+', code_text))
def is_continuation(field_text):
return bool(safe_re_search(r'(?i)\s*-\s*Con\.$', field_text))
def is_reserved_position(field_text):
return bool(safe_re_search(r'(?i)Reserved(\s*Position)?(s)?$', field_text))
def is_condition_entry(field_text):
return bool(safe_re_search(r'(?i)^\d{1,2}.+\s*Condition$', field_text))
def is_condition_header(field_text):
return bool(safe_re_search(r'(?i)^\w+\s*-\s*Axis\s*Conditions?$', field_text))
def is_condition_size(field_text):
return bool(safe_re_search(r'(?i)^Number\s*of\s*\w+\s*-\s*Axis\s*Conditions?$', field_text))
def check_for_duplicate_rows(df):
duplication_filter = (df.field_or_code.duplicated() & df.is_column_loc)
if len(df[duplication_filter]) == 0:
return None
duplicated_fields = list(df[duplication_filter].field_or_code.unique())
raise ValueError(f"Duplicate field names: {duplicated_fields}")
def add_basic_column_data(df):
try:
df.columns = ['location', 'size', 'field_or_code', 'code_value']
except ValueError:
# some pdfs get parsed with only 3 columns. Known issue with 2009 & earlier, exact num unclear
df.columns = ['location', 'size', 'field_or_code']
df['code_value'] = df['field_or_code'].apply(lambda x:
str(x)[str(x).find('...'):].strip() if str(x).find('...') >= 0 else pd.np.nan)
df['field_or_code'] = df['field_or_code'].apply(lambda x:
str(x)[:str(x).find('...')].strip() if str(x).find('...') >= 0 else x)
# tabula errors can introduce quotes. None exist in pdfs checked by hand.
for column in df.columns:
df[column] = df[column].apply(lambda x: str(x).strip(whitespace + '"') if not pd.isnull(x) else pd.np.nan)
df['is_column_loc'] = (df['location'].apply(is_column_location_row) &
~df['size'].apply(pd.isnull))
df['is_encoding'] = (df['code_value'].apply(is_encoding_row) &
~df['field_or_code'].apply(pd.isnull) & df['size'].apply(pd.isnull))
return df
def update_condition_names(df):
# some condition field names can end up in the code_values column
# these are migrated over
malformed_field_filter = (df.is_column_loc & df.field_or_code.isnull())
df.loc[malformed_field_filter, 'field_or_code'] = df[malformed_field_filter].code_value
df['condition_entry'] = (df['field_or_code'].apply(is_condition_entry) & df['is_column_loc'])
df['condition_type'] = (df['field_or_code'].apply(is_condition_header) & df['is_column_loc'])
df['condition_type'] = df.apply(
lambda row: row['field_or_code'].split('-')[0].strip().title()
if row['condition_type'] else None, axis=1)
df['condition_type'].fillna(method='ffill', inplace=True)
df = df[~df['field_or_code'].apply(is_condition_header)].copy()
df['condition_number'] = df.apply(
lambda row: ''.join([i for i in row['field_or_code'] if i.isnumeric()])
if row['condition_entry'] else None, axis=1
)
df.loc[df['condition_entry'], 'field_or_code'] = df[df['condition_entry']].apply(
lambda row: f'{row["condition_type"]}_condition_{row["condition_number"]}', axis=1)
df.loc[df['condition_entry'], 'code_value'] = pd.np.nan
df.loc[df['condition_entry'], 'is_column_loc'] = False
return df
def drop_useless_rows(df):
df = df[df.is_column_loc | df.is_encoding].copy()
df = df[~(df['field_or_code'].apply(is_continuation) & df['is_column_loc'])].copy()
df = df[~df['field_or_code'].apply(lambda x: str(x).startswith('Reserved'))].copy()
return df
def handle_missing_values(df):
df['location'].fillna(method='ffill', inplace=True)
df['size'].fillna(method='ffill', inplace=True)
df.fillna(value='', inplace=True)
return df
def tidy_data(df):
df['code_value'] = df['code_value'].apply(lambda x:
str(x)[str(x).find('...') + len('...'):].strip() if str(x).find('...') >= 0 else pd.np.nan)
# documentation uses 1 based indexing, we need 0 based
df['location_start'] = df['location'].apply(lambda x: int(x.split('-')[0]) - 1)
# first page of results has non-unique code keys for US vs territorial datasets
# retaining first keeps US keys
df.drop_duplicates(subset={'location', 'size', 'field_or_code'}, inplace=True)
df['size'] = df['size'].apply(int)
df['location_end'] = df.apply(
lambda row: row['location_start'] + row['size'], axis=1)
df['field'] = df.apply(lambda row: row['field_or_code']
if any([row['is_column_loc'], row['condition_entry']]) else pd.np.nan, axis=1)
df['field'].fillna(method='ffill', inplace=True)
df['field'] = df['field'].apply(lambda x: slugify(x).replace('-', '_'))
df.rename(columns={'field_or_code': 'code'}, inplace=True)
return df
def populate_field_data(df, year):
df['field_to_keep'] = df['field'].apply(lambda x:
any([bool(safe_re_search(pattern, x)) for pattern in UNCODED_FIELDS_TO_KEEP]))
df = df[~df['is_column_loc'] | df['field_to_keep']].copy()
df = df[~df['field'].isin(FIELDS_TO_DROP)].copy()
# enforcing title case to remove source of inconsistencies across years
df['code'] = df['code'].apply(str.title)
field_codes = defaultdict(dict)
df.apply(lambda row:
field_codes[row['field']].update({row['code']: row['code_value']}), axis=1)
field_codes.update(HARDCODED_CODES[year])
field_codes = {k: v for k, v in field_codes.items()
if not any([re.search(pattern, k) for pattern in FIELD_MAP_PATTERNS_TO_DROP])}
df.drop_duplicates(inplace=True)
df.sort_values(by=['location_start'], inplace=True)
df['location'] = df.apply(
lambda row: (row['location_start'], row['location_end']), axis=1)
df.drop_duplicates(subset=['location'], inplace=True)
field_locations = OrderedDict()
df.apply(lambda row: field_locations.update({row['field']: row['location']}), axis=1)
field_locations.update(HARDCODED_LOCATIONS[year])
field_locations = OrderedDict(sorted(field_locations.items(), key=lambda x: x[1][0]))
return field_locations, dict(field_codes), df
def parse_pdf_data(df, year):
df = add_basic_column_data(df)
df = drop_useless_rows(df)
df = update_condition_names(df)
check_for_duplicate_rows(df)
df = handle_missing_values(df)
df = tidy_data(df)
return populate_field_data(df, year)
def read_dataset(year, field_locations):
full_path = os.path.join(INPUT_DATA_DIR, DATA_PATHS[year])
return pd.read_fwf(full_path, colspecs=list(field_locations.values()),
names=list(field_locations.keys()), header=None, dtype=object)
def validate_data_types(df, year):
# since we don't know the complete list of columns to expect, we only
# validate a small sample that definitely ought exist with consistent formats
if not all([
set(df['autopsy'].apply(str.upper).unique()) == {'Y', 'N', 'U'},
set(df['sex'].unique()) == {'M', 'F'},
all(df['detail_age_type'].apply(str.isnumeric)),
df['current_data_year'].unique().tolist() == [str(year)],
len([x for x in df.columns if 'record' in x]) >= 20,
len([x for x in df.columns if 'entity' in x]) >= 20,
]):
pdb.set_trace()
raise ValueError(f"Invalid datatype found in {year} data")
def safe_first_re_match(pattern, text):
match = safe_re_search(pattern, text)
if match:
return match[0]
def extract_cause_title(text):
return safe_first_re_match(r'(?i)\b\d{1,4}\s*selected\s*causes\b', text)
def is_idc_data_header(text):
return bool(safe_re_search(r'(?i)^recodetsexagecause', re.sub('\s', '', text)))
def extract_recode(text):
return safe_first_re_match(r'^\d{3}', text)
def remove_filler_rows(df):
df['is_header'] = df['text'].apply(is_idc_data_header)
cause_title_indexes = list(df[df.is_cause_title].index)
header_indexes = list(df[df.is_header].index)
filler_indexes = [x for x in zip(cause_title_indexes, header_indexes)]
for min_idx, max_idx in filler_indexes:
df = df[(df.index < min_idx) | (df.index > max_idx)].copy()
return df
def validate_ICD_codes(icd_codes):
# codes should be form complete sequence within each group
for cause_title, code_group in icd_codes.items():
int_icd_codes = sorted([int(x) for x in code_group.keys()])
expected_code_range = [i for i in range(min(int_icd_codes), max(int_icd_codes) + 1)]
if int_icd_codes != expected_code_range:
pdb.set_trace()
raise ValueError(f'ICD codes for {cause_title} do not form complete sequence')
def read_ICD_codes(df):
df.columns = ['text']
df.dropna(inplace=True)
# some lines end up improperly quote wrapped due to tabula error
df['text'] = df[~df['text'].isnull()].copy()['text'].apply(
lambda x: x.strip('"') if not pd.isnull(x) else pd.np.nan)
df['cause_title'] = df['text'].apply(extract_cause_title)
df['is_cause_title'] = ~df['cause_title'].isnull()
df['cause_title'].fillna(method='ffill', inplace=True)
df = remove_filler_rows(df)
df['recode'] = df['text'].apply(extract_recode)
df['recode'].fillna(method='ffill', inplace=True)
# very first row can still be blank; can be safely dropped
df = df.dropna(subset=['text', 'recode'], how='any').copy()
df['text'] = (df[['text', 'recode', 'cause_title']].groupby(['cause_title', 'recode'])
.transform(lambda x: ' '.join(x)))
df.drop_duplicates(inplace=True)
# tabula fails to retain spacing; we cannot recover subtotal/gender/age codes
df['text'] = df['text'].apply(lambda x: x.lstrip(digits + whitespace).strip())
icd_codes = {title: dict() for title in df['cause_title'].unique()}
df.apply(lambda row: icd_codes[row['cause_title']].update({row['recode']: row['text']}), axis=1)
validate_ICD_codes(icd_codes)
return icd_codes
def update_field_map(field_map, icd_codes):
for code_group, icd_values in icd_codes.items():
group_number = safe_first_re_match('\d{1,4}', code_group)
matching_pattern = f'(?i){group_number}.+Cause_Recode$'
field_map_matching_key = [x for x in field_map.keys() if safe_re_search(matching_pattern, x)][0]
field_map[field_map_matching_key] = icd_values
return field_map
def export_code_maps(field_map, year):
with open(os.path.join(EXPORT_DATA_DIR, f'{year}_codes.json'), 'w+') as f_open:
json.dump(field_map, f_open)
def process_year(year):
print(f"Unpacking pdf for {year}")
documentation_data = extract_pdf(year)
field_locations, field_map, df = parse_pdf_data(documentation_data[0], year)
print(f"Parsing fixed with file for {year}")
df = read_dataset(year, field_locations)
validate_data_types(df, year)
df.to_csv(os.path.join(EXPORT_DATA_DIR, f'{year}_data.csv'), index=False)
print(f"Exporting column code mappings")
icd_codes = read_ICD_codes(documentation_data[1])
field_map = update_field_map(field_map, icd_codes)
export_code_maps(field_map, year)
print(f"Finished {year}")
def process_all_years():
for year in YEARS_TO_PROCESS:
process_year(year)
if __name__ == '__main__':
process_all_years()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment