Skip to content

Instantly share code, notes, and snippets.

@jgillmanjr
Last active February 25, 2019 18:55
Show Gist options
  • Save jgillmanjr/685e2c292ecfe7345c7db2c512415a79 to your computer and use it in GitHub Desktop.
Save jgillmanjr/685e2c292ecfe7345c7db2c512415a79 to your computer and use it in GitHub Desktop.
Build US Airman Database and Map
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import pendulum\n",
"import json\n",
"import time\n",
"import sqlite3\n",
"from calendar import monthrange\n",
"from IPython.display import display\n",
"from io import StringIO\n",
"from csv import DictReader, DictWriter, writer\n",
"from sqlalchemy import create_engine"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define some functions for loading in the data from CSVs"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def jd(o):\n",
" print(json.dumps(o, indent=4))\n",
" return None\n",
"\n",
"def dparse(dstrs):\n",
" retlist = []\n",
" for x in dstrs:\n",
" if isinstance(x, str) and len(x) == 6:\n",
" m = x[:2]\n",
" y = x[2:]\n",
" last_day = monthrange(int(y), int(m))[1]\n",
" dstr = '{y}{m}{d}'.format(y=y, m=m, d=last_day)\n",
" retlist.append(pendulum.from_format(dstr, 'YYYYMMDD').date())\n",
" elif isinstance(x, str) and len(x) == 8:\n",
" retlist.append(pendulum.from_format(x, 'MMDDYYYY').date())\n",
" else:\n",
" retlist.append('')\n",
" \n",
" return retlist\n",
"\n",
"def ct_chk(o):\n",
" \"\"\"\n",
" Custom type checking logic\n",
" \"\"\"\n",
" if o is not None and o.isdigit() and (len(o) in [6, 8]):\n",
" return 'faa_date'\n",
" else:\n",
" return o.__class__.__name__\n",
"\n",
"def schema_check(csv_list):\n",
" # fname is name of file\n",
" sd = {} # The \"schema dictionary\"\n",
" for row in csv_list:\n",
" for k, v in row.items():\n",
" if k not in sd:\n",
" sd[k] = set()\n",
" if v is not None and v.strip() != '':\n",
" t = ct_chk(v)\n",
" sd[k].add(t)\n",
" \n",
" for k, v in sd.items():\n",
" sd[k] = list(v)\n",
" \n",
" return sd\n",
"\n",
"def type_reduce(tlist):\n",
" \"\"\"\n",
" Reduce to a single type\n",
" \"\"\"\n",
" \n",
" if len(tlist) != 1:\n",
" return 'str'\n",
" if len(tlist) == 1:\n",
" return tlist[0]\n",
" \n",
"\n",
"def csv_preproc(fname):\n",
" \"\"\"\n",
" Preprocess a CSV and return dataframe object\n",
" \"\"\"\n",
" csv_list = []\n",
" schema_info = {}\n",
" prc_params = {\n",
" 'filepath_or_buffer': None, # To Be replaced\n",
" 'header': None,\n",
" 'names': [],\n",
" 'index_col': False,\n",
" 'dtype': object,\n",
" 'skiprows': 1,\n",
" 'parse_dates': [],\n",
" 'date_parser': dparse,\n",
" }\n",
"\n",
" # Load the CSV\n",
" reader = DictReader(open(fname))\n",
" for row in reader:\n",
" row.pop('') # Fucking FAA\n",
" for k, v in row.items():\n",
" row[k] = v.strip()\n",
" csv_list.append(row)\n",
"\n",
" # Type Check the fields and Strip field names\n",
" for i, (k, v) in enumerate(schema_check(csv_list).items()):\n",
" schema_info[k] = {}\n",
" cur_key = schema_info[k]\n",
" cur_key['pos_idx'] = i\n",
" cur_key['type'] = type_reduce(v)\n",
" cur_key['name'] = '_'.join(k.strip().split(' ')).lower()\n",
" \n",
" for si in schema_info.values():\n",
" prc_params['names'].append(si['name'])\n",
" if si['type'] == 'faa_date':\n",
" prc_params['parse_dates'].append(si['pos_idx'])\n",
" \n",
" #Build the CSV for pandas consumption..\n",
" ios = StringIO()\n",
" cw = writer(ios)\n",
" cw.writerow(prc_params['names'])\n",
" for row in csv_list:\n",
" new_row = []\n",
" for k, v in row.items():\n",
" new_row.append(v)\n",
" cw.writerow(new_row)\n",
" \n",
" ios.seek(0) # Reset to the beginning\n",
" prc_params['filepath_or_buffer'] = ios\n",
" \n",
" return pd.read_csv(**prc_params)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load the CSVs into dataframes"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Processing: PILOT_BASIC.csv\n",
"52.26949119567871\n",
"Processing: PILOT_CERT.csv\n",
"119.5246331691742\n",
"Processing: NONPILOT_BASIC.csv\n",
"13.655939102172852\n",
"Processing: NONPILOT_CERT.csv\n",
"7.561482906341553\n"
]
}
],
"source": [
"files = {\n",
" 'pb': {\n",
" 'file': 'PILOT_BASIC.csv',\n",
" },\n",
" 'pc': {\n",
" 'file': 'PILOT_CERT.csv',\n",
" },\n",
" 'npb': {\n",
" 'file': 'NONPILOT_BASIC.csv',\n",
" },\n",
" 'npc': {\n",
" 'file': 'NONPILOT_CERT.csv',\n",
" },\n",
"}\n",
"\n",
"for k, v in files.items():\n",
" ts = time.time()\n",
" print(\n",
" 'Processing: {file}'.format(file=v['file'])\n",
" )\n",
" v['df'] = csv_preproc(v['file'])\n",
" print(time.time() - ts)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## For future brevity and readability. Also, copy, because reloading takes a while"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"pb = files['pb']['df'].copy()\n",
"pc = files['pc']['df'].copy()\n",
"npb = files['npb']['df'].copy()\n",
"npc = files['npc']['df'].copy()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Validate row counts line up"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"True"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"True"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"True"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# Subtract 1, because first line in the file is the header\n",
"display(len(pb) == (sum(1 for line in open('PILOT_BASIC.csv')) - 1))\n",
"display(len(pc) == (sum(1 for line in open('PILOT_CERT.csv')) - 1))\n",
"display(len(npb) == (sum(1 for line in open('NONPILOT_BASIC.csv')) - 1))\n",
"display(len(npc) == (sum(1 for line in open('NONPILOT_CERT.csv')) - 1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Merge the basic data, dedupe, and get rid of non-USA individuals"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"860631"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"mb = pd.concat([pb, npb]).sort_index().drop_duplicates()\n",
"display(len(mb))\n",
"\n",
"mb = mb[(mb['country'] == 'USA')]\n",
"display(len(mb))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare the certificate data"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# For space considerations and readability..\n",
"pc = files['pc']['df']\n",
"npc = files['npc']['df']\n",
"\n",
"# Now we are going to add the type ratings (which will be empty) to the non-pilot cert data for eas of concat\n",
"for x in range(1,100):\n",
" npc['typerating{x}'.format(x=x)] = pd.Series(None, index=npc.index)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Union the certificate data (there shouldn't be any duplicates dropped)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1119188"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"mc = pd.concat([pc, npc])\n",
"display(len(mc))\n",
"\n",
"# mc = pd.concat([pc, npc]).drop_duplicates()\n",
"# display(len(mc))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now merge (join) the basic and certificate data. Inner join since we only care about US records."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1057299"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Do not use the index!! Use the unique_id\n",
"m = pd.merge(mb, mc, how='inner', left_on='unique_id', right_on='unique_id')\n",
"len(m)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Do some column cleanup and sorting"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"m.drop(columns=['first_name_y', 'last_name_y'], inplace=True)\n",
"m.rename(columns={'first_name_x': 'first_name', 'last_name_x': 'last_name'}, inplace=True)\n",
"m.sort_values(by=['state', 'city', 'last_name', 'first_name', 'unique_id', 'type'], inplace=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create and write to sqlite3 database"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name of sqlite file: usa_airman_201902.db\n"
]
}
],
"source": [
"# What will we call it\n",
"# db_name = input('Name of sqlite file: ')\n",
"\n",
"db_name = 'usa_airman_201902.db'\n",
"\n",
"e = create_engine('sqlite:///{sdb}'.format(sdb=db_name))"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"m.to_sql('airman', con=e, if_exists='replace')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment