Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Python script to scrape public SAPD data
from bs4 import BeautifulSoup
import requests
from urlparse import parse_qs
import logging
import time
import json
from datetime import timedelta, datetime
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
import ssl
import sys
# This script will attempt to download the PUBLIC sapd call records available.
# The end goal is to perform analysis that can give the public better information on
# what kind of calls are being placed, and possibly give SAPD better analysis/tools/etc
# to help assist them in their duties.
# This is needed because the site has terrible SSL support :(
class TLSv1Adapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections,
# Setup logging config
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Set our BASE URL constant to be used in requests
if __name__ == "__main__":
s = requests.Session()
# Mount our SSL Hack
s.mount('https://', TLSv1Adapter())
logger = logging.getLogger(__name__)
# Get the initial URL to grab the pesky VIEWSTATE, EVENTVALIDATION, etc.
response = s.get(BASE_URL)
if not response.ok:"Error getting URL: {0}".format(BASE_URL))"Status Code: {0}".format(str(response.status_code)))
soup = BeautifulSoup(response.text, "html.parser")
# Now let's setup our data to run through
data = {}
# Start on January 1, 2011
currentDate = datetime.strptime("03/26/2014", "%m/%d/%Y")
endDate = currentDate + timedelta(weeks=1)
# This will be a placeholder in case we manually need to get a week by days
daysToGet = 2
# Max date allowed by the SAPD system (Nov. 30, 2015)
maxDate = datetime.strptime("12/01/2015", "%m/%d/%Y")
output_file = open("records_2.json", "w")
# A hacky way to write a JSON file without storing the whole object in memory
# Apparently, these only need to be set once?
data["__EVENTVALIDATION"] = soup.find("input", {"id" : "__EVENTVALIDATION"})['value']
data["__VIEWSTATE"] = soup.find("input", {"id" : "__VIEWSTATE"})['value']
while currentDate < maxDate:
# We need to set our parameter (date, category, etc) for our search
data["txtStart"] = currentDate.strftime("%m/%d/%Y")
data["ddlCategory"] = "Other Calls"
data["btnSearch"] = "View Data"
data["cbxHOA$cbxHOA_HiddenField"] = "-1"
data["cbxHOA$cbxHOA_TextBox"] = ""
data["ddlSchoolDistrict"] = " "
data["ddlCouncilDistrict"] = " "
data["txtZipcode"] = ""
# If we have less than a week until the end date, let's run it per day
# Also, if we previously hit the max results limit, get the week by days
if ((maxDate - currentDate) < timedelta(weeks=1)) or daysToGet > 0:
data["rdbSearchRange"] = "day"
endDate = currentDate + timedelta(days=1)
if daysToGet > 0:
daysToGet -= 1
# Otherwise (99% of the queries) run the query by week
data["rdbSearchRange"] = "week"
endDate = currentDate + timedelta(weeks=1)
data["txtEndData"] = endDate.strftime("%m/%d/%Y")
# Now, we actually post the response to the server and get back the results
response =, data=data)
# If something went wrong, time to bail!
if not response.ok or response.status_code == 500:"Error getting URL: {0}".format(BASE_URL))"Status Code: {0}".format(str(response.status_code)))
# Parse the HTML returned into another "soup" object
soup = BeautifulSoup(response.text, "html.parser")
# Grab all the rows in the table (skip the header)
calls = soup.find("table",{"id" : "gvCFS"}).find_all("tr")[1:]
# If we have 10k rows, we hit a max limit. Let's set a counter and get this week day by day
if len(calls) >= 10000:"Reached max result limit - Getting by days instead of weeks.")
daysToGet = 7
continue"Sent: {0} - {1} - {2} - {3}".format(data["txtStart"], data["txtEndData"], data["rdbSearchRange"], str(len(calls))))"Found: {0} - {1}".format(soup.find("span", {"id" : "lblStartDate"}).text, soup.find("span", {"id" : "lblEndDate"}).text))
for call in calls:
# Let's extract all the columns from the row into an array called "fields"
fields = call.find_all("td")
# Create the JSON object from the table information
record = {
"id" : fields[0].text.strip(),
"category" : fields[1].text.strip() or "-",
"type" : fields[2].text.strip() or "-",
"@timestamp" : fields[3].text.strip() or "-",
"address" : fields[4].text.strip(),
"hoa" : fields[5].text.strip().replace(u'\xa0',"") or "-",
"schoolDistrict" : fields[6].text.strip().replace(u'\xa0',"") or "-",
"councilDistrict": fields[7].text.strip().replace(u'\xa0',"") or "-",
"zipcode" : fields[8].text.strip().replace(u'\xa0',"") or "-"
output_file.write("{0},\n".format(json.dumps(record, indent=4)))
# Little Timmy doesn't like it when people don't sleep between
# calls to a public service!
# Update the currentDate
currentDate = endDate + timedelta(days=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment