Created
June 19, 2016 03:12
-
-
Save jordan-wright/92e209d89a4174e3ccb50ed4f909e58e to your computer and use it in GitHub Desktop.
Python script to scrape public SAPD data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import requests | |
from urlparse import parse_qs | |
import logging | |
import time | |
import json | |
from datetime import timedelta, datetime | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.poolmanager import PoolManager | |
import ssl | |
import sys | |
# This script will attempt to download the PUBLIC sapd call records available. | |
# The end goal is to perform analysis that can give the public better information on | |
# what kind of calls are being placed, and possibly give SAPD better analysis/tools/etc | |
# to help assist them in their duties. | |
# This is needed because the site has terrible SSL support :( | |
class TLSv1Adapter(HTTPAdapter): | |
def init_poolmanager(self, connections, maxsize, block=False): | |
self.poolmanager = PoolManager(num_pools=connections, | |
maxsize=maxsize, | |
block=block, | |
ssl_version=ssl.PROTOCOL_TLSv1) | |
# Setup logging config | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') | |
# Set our BASE URL constant to be used in requests | |
BASE_URL = "https://webapps2.sanantonio.gov/policecalls/Reports.aspx" | |
if __name__ == "__main__": | |
s = requests.Session() | |
# Mount our SSL Hack | |
s.mount('https://', TLSv1Adapter()) | |
logger = logging.getLogger(__name__) | |
# Get the initial URL to grab the pesky VIEWSTATE, EVENTVALIDATION, etc. | |
response = s.get(BASE_URL) | |
if not response.ok: | |
logger.info("Error getting URL: {0}".format(BASE_URL)) | |
logger.info("Status Code: {0}".format(str(response.status_code))) | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Now let's setup our data to run through | |
data = {} | |
# Start on January 1, 2011 | |
currentDate = datetime.strptime("03/26/2014", "%m/%d/%Y") | |
endDate = currentDate + timedelta(weeks=1) | |
# This will be a placeholder in case we manually need to get a week by days | |
daysToGet = 2 | |
# Max date allowed by the SAPD system (Nov. 30, 2015) | |
maxDate = datetime.strptime("12/01/2015", "%m/%d/%Y") | |
output_file = open("records_2.json", "w") | |
# A hacky way to write a JSON file without storing the whole object in memory | |
output_file.write("[\n") | |
# Apparently, these only need to be set once? | |
data["__EVENTVALIDATION"] = soup.find("input", {"id" : "__EVENTVALIDATION"})['value'] | |
data["__VIEWSTATE"] = soup.find("input", {"id" : "__VIEWSTATE"})['value'] | |
while currentDate < maxDate: | |
# We need to set our parameter (date, category, etc) for our search | |
data["txtStart"] = currentDate.strftime("%m/%d/%Y") | |
data["ddlCategory"] = "Other Calls" | |
data["btnSearch"] = "View Data" | |
data["cbxHOA$cbxHOA_HiddenField"] = "-1" | |
data["cbxHOA$cbxHOA_TextBox"] = "" | |
data["ddlSchoolDistrict"] = " " | |
data["ddlCouncilDistrict"] = " " | |
data["txtZipcode"] = "" | |
# If we have less than a week until the end date, let's run it per day | |
# Also, if we previously hit the max results limit, get the week by days | |
if ((maxDate - currentDate) < timedelta(weeks=1)) or daysToGet > 0: | |
data["rdbSearchRange"] = "day" | |
endDate = currentDate + timedelta(days=1) | |
if daysToGet > 0: | |
daysToGet -= 1 | |
# Otherwise (99% of the queries) run the query by week | |
else: | |
data["rdbSearchRange"] = "week" | |
endDate = currentDate + timedelta(weeks=1) | |
data["txtEndData"] = endDate.strftime("%m/%d/%Y") | |
# Now, we actually post the response to the server and get back the results | |
response = s.post(BASE_URL, data=data) | |
# If something went wrong, time to bail! | |
if not response.ok or response.status_code == 500: | |
logger.info("Error getting URL: {0}".format(BASE_URL)) | |
logger.info("Status Code: {0}".format(str(response.status_code))) | |
logger.info(str(response.text)) | |
sys.exit(1) | |
# Parse the HTML returned into another "soup" object | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Grab all the rows in the table (skip the header) | |
calls = soup.find("table",{"id" : "gvCFS"}).find_all("tr")[1:] | |
# If we have 10k rows, we hit a max limit. Let's set a counter and get this week day by day | |
if len(calls) >= 10000: | |
logging.info("Reached max result limit - Getting by days instead of weeks.") | |
daysToGet = 7 | |
time.sleep(10) | |
continue | |
logger.info("Sent: {0} - {1} - {2} - {3}".format(data["txtStart"], data["txtEndData"], data["rdbSearchRange"], str(len(calls)))) | |
logger.info("Found: {0} - {1}".format(soup.find("span", {"id" : "lblStartDate"}).text, soup.find("span", {"id" : "lblEndDate"}).text)) | |
for call in calls: | |
# Let's extract all the columns from the row into an array called "fields" | |
fields = call.find_all("td") | |
# Create the JSON object from the table information | |
record = { | |
"id" : fields[0].text.strip(), | |
"category" : fields[1].text.strip() or "-", | |
"type" : fields[2].text.strip() or "-", | |
"@timestamp" : fields[3].text.strip() or "-", | |
"address" : fields[4].text.strip(), | |
"hoa" : fields[5].text.strip().replace(u'\xa0',"") or "-", | |
"schoolDistrict" : fields[6].text.strip().replace(u'\xa0',"") or "-", | |
"councilDistrict": fields[7].text.strip().replace(u'\xa0',"") or "-", | |
"zipcode" : fields[8].text.strip().replace(u'\xa0',"") or "-" | |
} | |
output_file.write("{0},\n".format(json.dumps(record, indent=4))) | |
# Little Timmy doesn't like it when people don't sleep between | |
# calls to a public service! | |
time.sleep(10) | |
# Update the currentDate | |
currentDate = endDate + timedelta(days=1) | |
output_file.write("]") | |
output_file.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment