Last active
December 14, 2020 09:01
-
-
Save tmathmeyer/384c7261e9b356569170a37fead55ddd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
from html.parser import HTMLParser | |
import inspect | |
import requests | |
import sqlite3 | |
URL = 'https://blue.kingcounty.com/Assessor/eRealProperty/Detail.aspx?ParcelNbr={}' | |
def FixEmptyRowNames(names): | |
for name in names: | |
if name != '': | |
yield name | |
def BindNames(names, values): | |
for name, value in zip(names, values): | |
if name != '': | |
yield name, value | |
class HtmlTable(object): | |
__slots__ = ['_tableName', '_tableRowType', '_tableRows', '_header', | |
'_tableCols', '_currentTableRow', '_currentCellEntry', | |
'_kvpMapping'] | |
def __init__(self, tablename, tablecols, header=True): | |
self._tableName = tablename | |
self._tableRowType = namedtuple( | |
f'{tablename}_entry', list(FixEmptyRowNames(tablecols))) | |
self._tableRows = [] | |
self._header = header | |
self._tableCols = tablecols | |
self._currentTableRow = None | |
self._currentCellEntry = None | |
self._kvpMapping = None | |
def feedElement(self, tag, attrs): | |
if tag == 'tr': | |
assert self._currentTableRow is None | |
if not self._header: | |
self._currentTableRow = [] | |
return | |
if tag == '/tr': | |
if self._header: | |
self._header = False | |
return | |
columns = dict(BindNames(self._tableCols, self._currentTableRow)) | |
self._tableRows.append(self._tableRowType(**columns)) | |
assert self._currentTableRow is not None | |
self._currentTableRow = None | |
return | |
if tag == 'td': | |
assert self._currentTableRow is not None | |
assert self._currentCellEntry is None | |
self._currentCellEntry = '' | |
return | |
if tag == '/td': | |
assert self._currentTableRow is not None | |
assert self._currentCellEntry is not None | |
self._currentTableRow.append(self._currentCellEntry) | |
self._currentCellEntry = None | |
return | |
def feedData(self, data): | |
data = data.strip() | |
if not data: | |
return | |
if self._currentCellEntry is not None: | |
self._currentCellEntry = data.strip() | |
elif self._currentTableRow is not None: | |
print(f'dropping data: {data}') | |
def rows(self): | |
def copy(): | |
yield from self._tableRows | |
return list(copy()) | |
def row(self, row): | |
return self._tableRows[row] | |
def ID(self): | |
return self._tableName | |
def get(self, key, default=None): | |
assert self._tableCols == ['key', 'value'] | |
if self._kvpMapping is None: | |
self._kvpMapping = {i.key: i.value for i in self._tableRows} | |
return self._kvpMapping.get(key, default) | |
def ExtractTables(content, tableDefs): | |
class QuickParser(HTMLParser): | |
def __init__(self): | |
super().__init__() | |
self._currentTableDef = None | |
self._tableIDMap = {t.ID():t for t in tableDefs.values()} | |
def handle_starttag(self, tag, attrs): | |
attrs = dict(attrs) | |
if tag != 'table' and self._currentTableDef is None: | |
return | |
if tag == 'table' and self._currentTableDef is None: | |
tableId = attrs.get('id', None) | |
self._currentTableDef = self._tableIDMap.get(tableId, None) | |
self._tableCount = 1 | |
elif self._currentTableDef is not None: | |
self._currentTableDef.feedElement(tag, attrs) | |
if tag == 'table': | |
self._tableCount += 1 | |
def handle_endtag(self, tag): | |
if self._currentTableDef is None: | |
return | |
if tag == 'table': | |
self._tableCount -= 1 | |
if self._tableCount == 0: | |
self._currentTableDef = None | |
else: | |
self._currentTableDef.feedElement(f'/{tag}', None) | |
def handle_data(self, data): | |
if self._currentTableDef: | |
self._currentTableDef.feedData(data) | |
QuickParser().feed(content) | |
return tableDefs | |
def intWCommas(value): | |
return int(float(value.replace(',', ''))) | |
def sqlmat(value, param): | |
if param.name == 'parcelID': | |
return value.replace('-', '') | |
if type(param.default) == str: | |
return f'"{value}"' | |
if type(param.default) == int: | |
return str(intWCommas(str(value))) | |
raise ValueError(value) | |
def AddDBColumn(db, tablename, fn, **kwargs): | |
noNonesense = {} | |
for k, v in kwargs.items(): | |
if v is not None: | |
noNonesense[k] = v | |
data = fn(**noNonesense) | |
params = inspect.signature(fn).parameters | |
headers = ','.join(x[0] for x in data) | |
values = ','.join(sqlmat(x[1], params[x[0]]) for x in data) | |
statement = f'insert into {tablename}({headers}) values({values})' | |
db.execute(statement) | |
def ParcelRowEntry(parcelID=0, address='', use_vacant='UNKNOWN', | |
use_imp='UNKNOWN', use_pres='UNKNOWN', lot_size=0, | |
zoning='UNKNOWN', water='UNKNOWN', septic='UNKNOWN', | |
road_access='UNKNOWN', street='PAVED', rainier_view='NO', | |
territorial_view='NO', olympics_view='NO', lake_wa_view='NO', | |
water_on_property='NO', waterfront_footage=0, | |
waterfront_access='NO', poor_quality='UNKNOWN', | |
historic='NO', deed_restrictions='NO', easements='NO', | |
water_issues='NO', other_issue='NO', year_built=0, | |
renovated=0, stories=0, grade=0, condition='UNKNOWN', | |
finished_area=0, basement_area=0, garage_area=0, bedrooms=0, | |
full_baths=0, tq_baths=0, h_baths=0, heat='UNKNOWN', | |
heat_delivery='UNKNOWN', deck_area=0, open_porch_area=0, | |
enclosed_porch_area=0, ultimate_assessment=0, | |
penultimate_assessment=0, five_year_asmnt_growth=0, | |
ten_year_asmnt_growth=0, ultimate_sale_price=0, | |
ultimate_sale_pct_asment=0, avg_sale_pct_asment=0): | |
return list(locals().items()) | |
def CreateTableFunction(tableName, tableSpec): | |
command = f'CREATE TABLE {tableName}(' | |
for (col, attr) in tableSpec: | |
if command[-1] != '(': | |
command += ', ' | |
command += f'{col} {" ".join(attr)}' | |
command += ')' | |
return command | |
def CreateTable(db, tableName, columnAdder): | |
schema = [] | |
params = inspect.signature(columnAdder).parameters | |
for name in params: | |
param = params[name] | |
if param.default != inspect.Parameter.empty: | |
props = [] | |
if type(param.default) == str: | |
props.append('TEXT') | |
elif type(param.default) == int: | |
props.append('INTEGER') | |
else: | |
print(param.default) | |
assert False | |
if not schema: | |
props.append('PRIMARY KEY') | |
schema.append((name, props)) | |
statement = CreateTableFunction(tableName, schema) | |
db.execute(statement) | |
def ParseParcelPage(content): | |
parcelInfoTableId = HtmlTable( | |
'cphContent_DetailsViewParcel', ['key', 'value'], header=False) | |
landAreaUsageTableId = HtmlTable( | |
'cphContent_DetailsViewLand', ['key', 'value'], header=False) | |
landRestrictionsUtilitiesTableId = HtmlTable( | |
'cphContent_DetailsViewLandSystem', ['key', 'value'], header=False) | |
landViewsTableId = HtmlTable( | |
'cphContent_DetailsViewLandViews', ['key', 'value'], header=False) | |
landWaterfrontTableId = HtmlTable( | |
'cphContent_DetailsViewLandWaterfront', ['key', 'value'], header=False) | |
landIssuesTableId = HtmlTable( | |
'cphContent_DetailsViewLandProblems', ['key', 'value'], header=False) | |
landDesignationsTableId = HtmlTable( | |
'cphContent_DetailsViewLandDesignations', ['key', 'value'], header=False) | |
buildingInfoTableId = HtmlTable( | |
'cphContent_DetailsViewResBldg', ['key', 'value'], header=False) | |
taxRollHistoryTable = HtmlTable('cphContent_GridViewTaxRoll', [ | |
'', 'year', '', '', 'levy', 'land', 'imp', 'total', '', '', '', '', '']) | |
salesHistoryTable = HtmlTable('cphContent_GridViewSales', [ | |
'', '', 'date', 'price', 'seller', 'buyer', '', '']) | |
return ExtractTables(content, { | |
'parcelInfo': parcelInfoTableId, | |
'landUsage': landAreaUsageTableId, | |
'landRestrictions':landRestrictionsUtilitiesTableId, | |
'landViews':landViewsTableId, | |
'landWaterfront':landWaterfrontTableId, | |
'landIssues':landIssuesTableId, | |
'landDesignations':landDesignationsTableId, | |
'buildingInfo':buildingInfoTableId, | |
'taxRoll':taxRollHistoryTable, | |
'sales': salesHistoryTable, | |
}) | |
def ParseParcelId(db, tableName, parcel): | |
content = requests.get(URL.format(parcel)).content.decode('utf-8') | |
tables = ParseParcelPage(content) | |
tax_roll = tables['taxRoll'].rows() | |
five_ago_y = min(5, len(tax_roll)) | |
five_ago = intWCommas(tax_roll[five_ago_y-1].total)+1 | |
five_change = intWCommas(tax_roll[0].total) / five_ago | |
five_geo = five_change**(1/five_ago_y) | |
five_pct = (five_geo - 1) * 100 | |
ten_ago_y = min(10, len(tax_roll)) | |
ten_ago = intWCommas(tax_roll[ten_ago_y-1].total)+1 | |
ten_change = intWCommas(tax_roll[0].total) / ten_ago | |
ten_geo = ten_change**(1/ten_ago_y) | |
ten_pct = (ten_geo - 1) * 100 | |
prices = [] | |
pcts = [] | |
taxes = iter(tax_roll) | |
for sale in tables['sales'].rows(): | |
sale_price = intWCommas(sale.price[1:-3]) | |
prices.append(sale_price) | |
year = intWCommas(sale.date.split('/')[2]) | |
test_year = 9999 | |
assessment = None | |
while year < test_year: | |
try: | |
assessment = next(taxes) | |
test_year = intWCommas(assessment.year) | |
except: | |
test_year = year | |
pass | |
closest_assessment = intWCommas(assessment.total) | |
if closest_assessment != 0: | |
pcts.append(100 * sale_price / closest_assessment) | |
average_pct = (sum(pcts) / len(pcts)) if pcts else 0 | |
grade = -1 | |
if tables['buildingInfo'].rows(): | |
grade = int(tables['buildingInfo'].get('Grade').split()[0]) | |
AddDBColumn(db, tableName, ParcelRowEntry, | |
parcelID=tables['parcelInfo'].get('Parcel'), | |
address=tables['parcelInfo'].get('Site Address'), | |
use_vacant=tables['landUsage'].get('Highest & Best Use As If Vacant'), | |
use_imp=tables['landUsage'].get('Highest & Best Use As Improved'), | |
use_pres=tables['landUsage'].get('Present Use'), | |
lot_size=tables['landUsage'].get('Land SqFt'), | |
zoning=tables['landRestrictions'].get('Zoning'), | |
water=tables['landRestrictions'].get('Water'), | |
septic=tables['landRestrictions'].get('Sewer/Septic'), | |
road_access=tables['landRestrictions'].get('Road Access'), | |
street=tables['landRestrictions'].get('Street Surface'), | |
rainier_view=tables['landViews'].get('Rainier'), | |
territorial_view=tables['landViews'].get('Territorial'), | |
olympics_view=tables['landViews'].get('Olympics'), | |
lake_wa_view=tables['landViews'].get('Lake Washington'), | |
water_on_property=tables['landViews'].get('Lake/River/Creek'), | |
waterfront_footage=tables['landWaterfront'].get('Waterfront Footage'), | |
waterfront_access=tables['landWaterfront'].get('Waterfront Access Rights'), | |
poor_quality=tables['landWaterfront'].get('Poor Quality'), | |
historic=tables['landDesignations'].get('Historic Site'), | |
deed_restrictions=tables['landDesignations'].get('Deed Restrictions'), | |
easements=tables['landDesignations'].get('Easements'), | |
water_issues=tables['landIssues'].get('Water Problems'), | |
other_issue=tables['landIssues'].get('Other Problems'), | |
year_built=tables['buildingInfo'].get('Year Built'), | |
renovated=tables['buildingInfo'].get('Year Renovated'), | |
stories=tables['buildingInfo'].get('Stories'), | |
grade=grade, | |
condition=tables['buildingInfo'].get('Condition'), | |
finished_area=tables['buildingInfo'].get('Total Finished Area'), | |
basement_area=tables['buildingInfo'].get('Total Basement'), | |
garage_area=tables['buildingInfo'].get('Attached Garage'), | |
bedrooms=tables['buildingInfo'].get('Bedrooms'), | |
full_baths=tables['buildingInfo'].get('Full Baths'), | |
tq_baths=tables['buildingInfo'].get('3/4 Baths'), | |
h_baths=tables['buildingInfo'].get('1/2 Baths'), | |
heat=tables['buildingInfo'].get('Heat Source'), | |
heat_delivery=tables['buildingInfo'].get('Heat System'), | |
deck_area=tables['buildingInfo'].get('Deck Area SqFt'), | |
open_porch_area=tables['buildingInfo'].get('Open Porch SqFt'), | |
enclosed_porch_area=tables['buildingInfo'].get('Enclosed Porch SqFt'), | |
ultimate_assessment=tables['taxRoll'].row(0).total, | |
penultimate_assessment=tables['taxRoll'].row(1).total, | |
five_year_asmnt_growth=five_pct, | |
ten_year_asmnt_growth=ten_pct, | |
ultimate_sale_price=prices[0] if prices else 0, | |
ultimate_sale_pct_asment=pcts[0] if pcts else 0, | |
avg_sale_pct_asment=average_pct) | |
if __name__ == '__main__': | |
if len(sys.argv) == 1: | |
print('Usage: ./datagrabber.py [parcel_ids_file]') | |
return | |
conn = sqlite3.connect('parcels.db') | |
CreateTable(conn.cursor(), 'parcels', ParcelRowEntry) | |
conn.commit() | |
with open(sys.argv[1], 'r') as f: | |
for line in f.readlines(): | |
parcel = line.strip() | |
print(f'pulling data for parcel {parcel}') | |
try: | |
ParseParcelId(conn.cursor(), 'parcels', parcel) | |
conn.commit() | |
except Exception as E: | |
print(f'failed to parse parcel: {parcel}') | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment