Skip to content

Instantly share code, notes, and snippets.

@tmathmeyer
Last active December 14, 2020 09:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tmathmeyer/384c7261e9b356569170a37fead55ddd to your computer and use it in GitHub Desktop.
Save tmathmeyer/384c7261e9b356569170a37fead55ddd to your computer and use it in GitHub Desktop.
from collections import namedtuple
from html.parser import HTMLParser
import inspect
import requests
import sqlite3
URL = 'https://blue.kingcounty.com/Assessor/eRealProperty/Detail.aspx?ParcelNbr={}'
def FixEmptyRowNames(names):
for name in names:
if name != '':
yield name
def BindNames(names, values):
for name, value in zip(names, values):
if name != '':
yield name, value
class HtmlTable(object):
__slots__ = ['_tableName', '_tableRowType', '_tableRows', '_header',
'_tableCols', '_currentTableRow', '_currentCellEntry',
'_kvpMapping']
def __init__(self, tablename, tablecols, header=True):
self._tableName = tablename
self._tableRowType = namedtuple(
f'{tablename}_entry', list(FixEmptyRowNames(tablecols)))
self._tableRows = []
self._header = header
self._tableCols = tablecols
self._currentTableRow = None
self._currentCellEntry = None
self._kvpMapping = None
def feedElement(self, tag, attrs):
if tag == 'tr':
assert self._currentTableRow is None
if not self._header:
self._currentTableRow = []
return
if tag == '/tr':
if self._header:
self._header = False
return
columns = dict(BindNames(self._tableCols, self._currentTableRow))
self._tableRows.append(self._tableRowType(**columns))
assert self._currentTableRow is not None
self._currentTableRow = None
return
if tag == 'td':
assert self._currentTableRow is not None
assert self._currentCellEntry is None
self._currentCellEntry = ''
return
if tag == '/td':
assert self._currentTableRow is not None
assert self._currentCellEntry is not None
self._currentTableRow.append(self._currentCellEntry)
self._currentCellEntry = None
return
def feedData(self, data):
data = data.strip()
if not data:
return
if self._currentCellEntry is not None:
self._currentCellEntry = data.strip()
elif self._currentTableRow is not None:
print(f'dropping data: {data}')
def rows(self):
def copy():
yield from self._tableRows
return list(copy())
def row(self, row):
return self._tableRows[row]
def ID(self):
return self._tableName
def get(self, key, default=None):
assert self._tableCols == ['key', 'value']
if self._kvpMapping is None:
self._kvpMapping = {i.key: i.value for i in self._tableRows}
return self._kvpMapping.get(key, default)
def ExtractTables(content, tableDefs):
class QuickParser(HTMLParser):
def __init__(self):
super().__init__()
self._currentTableDef = None
self._tableIDMap = {t.ID():t for t in tableDefs.values()}
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag != 'table' and self._currentTableDef is None:
return
if tag == 'table' and self._currentTableDef is None:
tableId = attrs.get('id', None)
self._currentTableDef = self._tableIDMap.get(tableId, None)
self._tableCount = 1
elif self._currentTableDef is not None:
self._currentTableDef.feedElement(tag, attrs)
if tag == 'table':
self._tableCount += 1
def handle_endtag(self, tag):
if self._currentTableDef is None:
return
if tag == 'table':
self._tableCount -= 1
if self._tableCount == 0:
self._currentTableDef = None
else:
self._currentTableDef.feedElement(f'/{tag}', None)
def handle_data(self, data):
if self._currentTableDef:
self._currentTableDef.feedData(data)
QuickParser().feed(content)
return tableDefs
def intWCommas(value):
return int(float(value.replace(',', '')))
def sqlmat(value, param):
if param.name == 'parcelID':
return value.replace('-', '')
if type(param.default) == str:
return f'"{value}"'
if type(param.default) == int:
return str(intWCommas(str(value)))
raise ValueError(value)
def AddDBColumn(db, tablename, fn, **kwargs):
noNonesense = {}
for k, v in kwargs.items():
if v is not None:
noNonesense[k] = v
data = fn(**noNonesense)
params = inspect.signature(fn).parameters
headers = ','.join(x[0] for x in data)
values = ','.join(sqlmat(x[1], params[x[0]]) for x in data)
statement = f'insert into {tablename}({headers}) values({values})'
db.execute(statement)
def ParcelRowEntry(parcelID=0, address='', use_vacant='UNKNOWN',
use_imp='UNKNOWN', use_pres='UNKNOWN', lot_size=0,
zoning='UNKNOWN', water='UNKNOWN', septic='UNKNOWN',
road_access='UNKNOWN', street='PAVED', rainier_view='NO',
territorial_view='NO', olympics_view='NO', lake_wa_view='NO',
water_on_property='NO', waterfront_footage=0,
waterfront_access='NO', poor_quality='UNKNOWN',
historic='NO', deed_restrictions='NO', easements='NO',
water_issues='NO', other_issue='NO', year_built=0,
renovated=0, stories=0, grade=0, condition='UNKNOWN',
finished_area=0, basement_area=0, garage_area=0, bedrooms=0,
full_baths=0, tq_baths=0, h_baths=0, heat='UNKNOWN',
heat_delivery='UNKNOWN', deck_area=0, open_porch_area=0,
enclosed_porch_area=0, ultimate_assessment=0,
penultimate_assessment=0, five_year_asmnt_growth=0,
ten_year_asmnt_growth=0, ultimate_sale_price=0,
ultimate_sale_pct_asment=0, avg_sale_pct_asment=0):
return list(locals().items())
def CreateTableFunction(tableName, tableSpec):
command = f'CREATE TABLE {tableName}('
for (col, attr) in tableSpec:
if command[-1] != '(':
command += ', '
command += f'{col} {" ".join(attr)}'
command += ')'
return command
def CreateTable(db, tableName, columnAdder):
schema = []
params = inspect.signature(columnAdder).parameters
for name in params:
param = params[name]
if param.default != inspect.Parameter.empty:
props = []
if type(param.default) == str:
props.append('TEXT')
elif type(param.default) == int:
props.append('INTEGER')
else:
print(param.default)
assert False
if not schema:
props.append('PRIMARY KEY')
schema.append((name, props))
statement = CreateTableFunction(tableName, schema)
db.execute(statement)
def ParseParcelPage(content):
parcelInfoTableId = HtmlTable(
'cphContent_DetailsViewParcel', ['key', 'value'], header=False)
landAreaUsageTableId = HtmlTable(
'cphContent_DetailsViewLand', ['key', 'value'], header=False)
landRestrictionsUtilitiesTableId = HtmlTable(
'cphContent_DetailsViewLandSystem', ['key', 'value'], header=False)
landViewsTableId = HtmlTable(
'cphContent_DetailsViewLandViews', ['key', 'value'], header=False)
landWaterfrontTableId = HtmlTable(
'cphContent_DetailsViewLandWaterfront', ['key', 'value'], header=False)
landIssuesTableId = HtmlTable(
'cphContent_DetailsViewLandProblems', ['key', 'value'], header=False)
landDesignationsTableId = HtmlTable(
'cphContent_DetailsViewLandDesignations', ['key', 'value'], header=False)
buildingInfoTableId = HtmlTable(
'cphContent_DetailsViewResBldg', ['key', 'value'], header=False)
taxRollHistoryTable = HtmlTable('cphContent_GridViewTaxRoll', [
'', 'year', '', '', 'levy', 'land', 'imp', 'total', '', '', '', '', ''])
salesHistoryTable = HtmlTable('cphContent_GridViewSales', [
'', '', 'date', 'price', 'seller', 'buyer', '', ''])
return ExtractTables(content, {
'parcelInfo': parcelInfoTableId,
'landUsage': landAreaUsageTableId,
'landRestrictions':landRestrictionsUtilitiesTableId,
'landViews':landViewsTableId,
'landWaterfront':landWaterfrontTableId,
'landIssues':landIssuesTableId,
'landDesignations':landDesignationsTableId,
'buildingInfo':buildingInfoTableId,
'taxRoll':taxRollHistoryTable,
'sales': salesHistoryTable,
})
def ParseParcelId(db, tableName, parcel):
content = requests.get(URL.format(parcel)).content.decode('utf-8')
tables = ParseParcelPage(content)
tax_roll = tables['taxRoll'].rows()
five_ago_y = min(5, len(tax_roll))
five_ago = intWCommas(tax_roll[five_ago_y-1].total)+1
five_change = intWCommas(tax_roll[0].total) / five_ago
five_geo = five_change**(1/five_ago_y)
five_pct = (five_geo - 1) * 100
ten_ago_y = min(10, len(tax_roll))
ten_ago = intWCommas(tax_roll[ten_ago_y-1].total)+1
ten_change = intWCommas(tax_roll[0].total) / ten_ago
ten_geo = ten_change**(1/ten_ago_y)
ten_pct = (ten_geo - 1) * 100
prices = []
pcts = []
taxes = iter(tax_roll)
for sale in tables['sales'].rows():
sale_price = intWCommas(sale.price[1:-3])
prices.append(sale_price)
year = intWCommas(sale.date.split('/')[2])
test_year = 9999
assessment = None
while year < test_year:
try:
assessment = next(taxes)
test_year = intWCommas(assessment.year)
except:
test_year = year
pass
closest_assessment = intWCommas(assessment.total)
if closest_assessment != 0:
pcts.append(100 * sale_price / closest_assessment)
average_pct = (sum(pcts) / len(pcts)) if pcts else 0
grade = -1
if tables['buildingInfo'].rows():
grade = int(tables['buildingInfo'].get('Grade').split()[0])
AddDBColumn(db, tableName, ParcelRowEntry,
parcelID=tables['parcelInfo'].get('Parcel'),
address=tables['parcelInfo'].get('Site Address'),
use_vacant=tables['landUsage'].get('Highest & Best Use As If Vacant'),
use_imp=tables['landUsage'].get('Highest & Best Use As Improved'),
use_pres=tables['landUsage'].get('Present Use'),
lot_size=tables['landUsage'].get('Land SqFt'),
zoning=tables['landRestrictions'].get('Zoning'),
water=tables['landRestrictions'].get('Water'),
septic=tables['landRestrictions'].get('Sewer/Septic'),
road_access=tables['landRestrictions'].get('Road Access'),
street=tables['landRestrictions'].get('Street Surface'),
rainier_view=tables['landViews'].get('Rainier'),
territorial_view=tables['landViews'].get('Territorial'),
olympics_view=tables['landViews'].get('Olympics'),
lake_wa_view=tables['landViews'].get('Lake Washington'),
water_on_property=tables['landViews'].get('Lake/River/Creek'),
waterfront_footage=tables['landWaterfront'].get('Waterfront Footage'),
waterfront_access=tables['landWaterfront'].get('Waterfront Access Rights'),
poor_quality=tables['landWaterfront'].get('Poor Quality'),
historic=tables['landDesignations'].get('Historic Site'),
deed_restrictions=tables['landDesignations'].get('Deed Restrictions'),
easements=tables['landDesignations'].get('Easements'),
water_issues=tables['landIssues'].get('Water Problems'),
other_issue=tables['landIssues'].get('Other Problems'),
year_built=tables['buildingInfo'].get('Year Built'),
renovated=tables['buildingInfo'].get('Year Renovated'),
stories=tables['buildingInfo'].get('Stories'),
grade=grade,
condition=tables['buildingInfo'].get('Condition'),
finished_area=tables['buildingInfo'].get('Total Finished Area'),
basement_area=tables['buildingInfo'].get('Total Basement'),
garage_area=tables['buildingInfo'].get('Attached Garage'),
bedrooms=tables['buildingInfo'].get('Bedrooms'),
full_baths=tables['buildingInfo'].get('Full Baths'),
tq_baths=tables['buildingInfo'].get('3/4 Baths'),
h_baths=tables['buildingInfo'].get('1/2 Baths'),
heat=tables['buildingInfo'].get('Heat Source'),
heat_delivery=tables['buildingInfo'].get('Heat System'),
deck_area=tables['buildingInfo'].get('Deck Area SqFt'),
open_porch_area=tables['buildingInfo'].get('Open Porch SqFt'),
enclosed_porch_area=tables['buildingInfo'].get('Enclosed Porch SqFt'),
ultimate_assessment=tables['taxRoll'].row(0).total,
penultimate_assessment=tables['taxRoll'].row(1).total,
five_year_asmnt_growth=five_pct,
ten_year_asmnt_growth=ten_pct,
ultimate_sale_price=prices[0] if prices else 0,
ultimate_sale_pct_asment=pcts[0] if pcts else 0,
avg_sale_pct_asment=average_pct)
if __name__ == '__main__':
if len(sys.argv) == 1:
print('Usage: ./datagrabber.py [parcel_ids_file]')
return
conn = sqlite3.connect('parcels.db')
CreateTable(conn.cursor(), 'parcels', ParcelRowEntry)
conn.commit()
with open(sys.argv[1], 'r') as f:
for line in f.readlines():
parcel = line.strip()
print(f'pulling data for parcel {parcel}')
try:
ParseParcelId(conn.cursor(), 'parcels', parcel)
conn.commit()
except Exception as E:
print(f'failed to parse parcel: {parcel}')
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment