Skip to content

Instantly share code, notes, and snippets.

@tmthyjames
Created August 27, 2016 23:45
Show Gist options
  • Save tmthyjames/68e67180b2fc3967383997c8bf68ef4c to your computer and use it in GitHub Desktop.
Save tmthyjames/68e67180b2fc3967383997c8bf68ef4c to your computer and use it in GitHub Desktop.
Scraping EPA data using randomized time delays and a PhantomJS headless browser
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"#!/usr/local/bin/python2.7\n",
"\n",
"__version__ = '1.0'\n",
"__author__ = 'tim dobbins'\n",
"\n",
"import csv, os, re, sys, time, subprocess, itertools\n",
"from random import randint, sample, uniform\n",
"from datetime import datetime\n",
"from os.path import expanduser\n",
"import requests\n",
"from requests.exceptions import ConnectionError\n",
"from bs4 import BeautifulSoup\n",
"from pattern.web import URL\n",
"import psycopg2\n",
"from lxml import html\n",
"from utility import Utility\n",
"\n",
"\"\"\"For the HistoricalWeather class, all data comes from http://www.wunderground.com/history\"\"\"\n",
"\n",
"\n",
"class EPA(object):\n",
" \n",
" TO = ['xxxxxxx@gmail.com']\n",
" FROM = 'xxxxxxx@gmail.com'\n",
" home = expanduser(\"~\") # gets home directory of computer\n",
" file_name = \"newzips_with_lat_llong.csv\" # name of file to read from\n",
"\n",
" def __init__(self, zipcode):\n",
" self.zipcode = zipcode\n",
"\n",
" @classmethod\n",
" def get_file(cls):\n",
" \"\"\"returns the name of the file to read from\"\"\"\n",
" return cls.file_name\n",
" \n",
" @staticmethod\n",
" def get_path(name):\n",
" \"\"\"retrieves the path to the file newzips_with_lat_llong.csv\"\"\"\n",
" \n",
" for root, dirs, files in os.walk(EPA.home):\n",
" if name in files:\n",
" return os.path.join(root, name)\n",
" \n",
" def _try_this(self, iterable):\n",
" \"\"\"encodes all values to make them compatible with CSV file\"\"\"\n",
"\n",
" for i in iterable:\n",
" try:\n",
" i = i.text.encode('utf-8')\n",
" return i\n",
" except IndexError:\n",
" pass\n",
" \n",
" def _groupElements(self, iterable, n):\n",
" \"\"\"takes raw list and groups every n elements as a pair and returns \n",
" a generator\"\"\"\n",
"\n",
" source = iter(iterable)\n",
" while True:\n",
" l = []\n",
" for i in range(n):\n",
" l.append(source.next())\n",
" yield list(l)\n",
" \n",
" @staticmethod \n",
" def remove_apostrophe(iterable, pattern):\n",
" \"\"\"remove apostrophes from strings\"\"\"\n",
" iterable = [re.sub(pattern, '', x) for x in iterable]\n",
" return iterable\n",
" \n",
" @staticmethod\n",
" def read_zips(path):\n",
" \"\"\"reads zips from EPA.get_path() or accepts the path that points to the \n",
" appropriate zipcodes file\"\"\"\n",
" \n",
" with open(path, 'rb') as zips:\n",
" reader = csv.reader(zips)\n",
" headers = reader.next()\n",
" headers = filter(None, headers)\n",
"\n",
" column = {}\n",
" for h in headers:\n",
" column[h] = []\n",
"\n",
" for row in reader:\n",
" for h, v in zip(headers, row):\n",
" column[h].append(v)\n",
"\n",
" zip_codes = []\n",
" no_zips = []\n",
" for i in column['zipcode']:\n",
" if len(i) == 5:\n",
" zip_codes.append(i)\n",
" else:\n",
" no_zips.append(i)\n",
"\n",
" zip_codes = column['zipcode']\n",
" latitude = zip(column['miny'], column['maxy'])\n",
" longitude = zip(column['minx'], column['maxx'])\n",
" lat_long = zip(latitude, longitude)\n",
" zip_dic = dict(zip(zip_codes, lat_long))\n",
" zip_codes_dic = []\n",
" for key, value in zip_dic.iteritems():\n",
" zip_codes_dic.append([key, value])\n",
" return zip_codes_dic\n",
" \n",
" def _get_lat_long(self):\n",
" \"\"\"retrieves latitude and longitude for the specified zipcode\"\"\"\n",
" \n",
" path = EPA.get_path(EPA.file_name)\n",
" for i in EPA.read_zips(path):\n",
" if i[0] == str(self.zipcode):\n",
" try:\n",
" miny = float(i[1][0][0])\n",
" maxy = float(i[1][0][1])\n",
" minx = float(i[1][1][0])\n",
" maxx = float(i[1][1][1])\n",
" except:\n",
" minx=miny=maxx=maxy = ''\n",
" return [miny, minx, maxy, maxx]\n",
" \n",
" def _get_air_data(self):\n",
" \"\"\"scrapes url and captures required data: aqi, zipcode, date, hour observed\"\"\"\n",
" \n",
" air_list = []\n",
" url = \"\"\"http://ws1.airnowgateway.org/GatewayWebServiceREST/Gateway.svc/observedbyzipcode?key=6601D854-A73D-4AE8-ABF2-B15D65E34943&format=xml&zipcode=\"\"\"+str(self.zipcode)\n",
" request = requests.get(url)\n",
" air_soup = BeautifulSoup(request.content)\n",
" \n",
" aqi = air_soup.find_all('aqi')\n",
" city = air_soup.find_all(re.compile('reportingarea', flags=re.IGNORECASE))\n",
" dateobserved = air_soup.find_all(re.compile('dateobserved', flags=re.IGNORECASE))\n",
" hourobserved = air_soup.find_all(re.compile('hourobserved', flags=re.IGNORECASE))\n",
" \n",
" aqi = self._try_this(aqi)\n",
" city = self._try_this(city)\n",
" dateobserved = self._try_this(dateobserved)\n",
" hourobserved = self._try_this(hourobserved)\n",
" \n",
" if city:\n",
" air_list.append([self.zipcode, city, aqi, dateobserved, hourobserved])\n",
" return air_list\n",
" \n",
" def _get_water_data(self):\n",
" \"\"\"scrapes url and captures required data: zipcode, water quality, type, area,\n",
" body of water\"\"\"\n",
" \n",
" miny, minx, maxy, maxx = self._get_lat_long()\n",
" water_list = []\n",
" subject = 'EPA - get_water_data()'\n",
" if maxx:\n",
" try:\n",
" url = 'http://www.epa.gov/myenvmap/processTMLD.aspx?minx='+str(minx)+'&miny='+str(miny)+'&maxx='+str(maxx)+'&maxy='+str(maxy)\n",
" request = requests.get(url)\n",
" water_soup = BeautifulSoup(request.content)\n",
" try:\n",
" gdata = water_soup.find_all('table')[0].find_all('td')\n",
" for i in gdata:\n",
" water_list.append(i.text.encode('utf-8'))\n",
" water_list = water_list[4:]\n",
" water_list = [i for i in self._groupElements(water_list, 4)]\n",
" [i.insert(0, self.zipcode) for i in water_list]\n",
" except IndexError as e:\n",
" print e\n",
" except requests.exceptions.Timeout as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, e)\n",
" except requests.exceptions.ConnectionError as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, e)\n",
" return water_list\n",
" \n",
" @staticmethod\n",
" def _write_data(filename, headers, _list):\n",
" \"\"\"writes data to desktop\"\"\"\n",
"\n",
" with open(EPA.home+'/Desktop/'+filename, 'w') as f:\n",
" writer = csv.writer(f, delimiter=',', quoting=csv.QUOTE_ALL)\n",
" if 'water' in filename:\n",
" [writer.writerow(headers)]\n",
" [[writer.writerow(row) for row in i] for i in _list]\n",
" elif 'air' in filename:\n",
" [writer.writerow(headers)]\n",
" [[writer.writerow(row) for row in i] for i in _list]\n",
" else:\n",
" [writer.writerow(headers)]\n",
" [writer.writerow(row) for i in _list]\n",
" \n",
" @staticmethod \n",
" def get_data(path, _type, db=None, write=None):\n",
" \"\"\"brings it all together. utilizes get_water_data() and get_air_data() to bring in data\n",
" then inserts the data into postgres database (environment), then calculates various metrics:\n",
" percentage complete, zipcodes complete, whether or not the current zipcode contains results, \n",
" how long it has been since function was called, and the size of the file being written.\n",
" Then it submits the written file to Google Drive for teamates to access. Then calculates \n",
" percentage of zipcodes that contained results.\"\"\"\n",
" \n",
" subject = 'EPA - get_data'\n",
"\n",
" if db:\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
"\n",
" listname = []\n",
" emptylist = []\n",
"\n",
" if _type == 'air':\n",
" headers = ['ZIP CODE','CITY','AQI','DATE OBSERVED','HOUR OBSERVED']\n",
" elif _type == 'water':\n",
" headers = ['ZIP CODE','NAME','TYPE','SIZE','STATUS']\n",
"\n",
" print '\\n\\n'\n",
"\n",
" q = 0\n",
" starttime = datetime.now()\n",
" zips = EPA.read_zips(path)\n",
" # test_zips = EPA.read_zips(path) # -> \"newzips_with_lat_llong.csv\"\n",
" for i in zips:\n",
" zcode = i[0]\n",
" lat = i[0][0]\n",
" lon = i[0][1]\n",
" epa = EPA(zcode)\n",
" if _type == 'air':\n",
" data = epa._get_air_data()\n",
" name = 'air'\n",
"\n",
" if db:\n",
" for i in data:\n",
" if i:\n",
" zipcode, city, aqi, date, hour = EPA.remove_apostrophe(i, '\\'')\n",
" cur.execute(\"\"\"insert into air (city, zipcode, aqi, observation_date, observationhour) \n",
" values ('%s', %s, %s, '%s', %d)\"\"\" % (re.sub('\\'','',city), zipcode, aqi, date, int(hour)))\n",
" conn.commit()\n",
"\n",
" elif _type == 'water':\n",
" data = epa._get_water_data()\n",
" name = 'water'\n",
"\n",
" if db:\n",
" for i in data:\n",
" if i:\n",
" zipcode, body, code, area, status = EPA.remove_apostrophe(i, '\\'')\n",
" code = re.sub('[^\\x00-\\x7F]+','', code)\n",
" area = re.sub('[^\\x00-\\x7F]+','', area)\n",
" if not code or code == ' ':\n",
" code = 0\n",
" if not area or area == ' ':\n",
" area = 'n/a'\n",
" try:\n",
" cur.execute(\"\"\"insert into water (bodyofwater, zipcode, type, size, quality)\n",
" values ('%s', %s, %s, '%s', '%s')\"\"\" % (body, zipcode, code, area, status))\n",
" except psycopg2.ProgrammingError as e:\n",
" print e, zipcode, i\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e,i])\n",
" pass\n",
" except psycopg2.DataError as e:\n",
" print e, zipcode\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e,i])\n",
" except Exception as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e,i])\n",
" finally:\n",
" conn.commit()\n",
"\n",
" q = q+1\n",
" sys.stdout.write('\\r')\n",
" percentage = (100.0/len(zips))*q\n",
" timer = datetime.now() - starttime\n",
" timer = str(timer)[:-3]\n",
"\n",
" if _type == 'air':\n",
" try:\n",
" size_in_b = os.path.getsize(EPA.home+'/Desktop/air.csv')\n",
" except OSError as e:\n",
" size_in_b = 0\n",
" elif _type == 'water':\n",
" try:\n",
" size_in_b = os.path.getsize(EPA.home+'/Desktop/water.csv')\n",
" except OSError as e:\n",
" size_in_b = 0\n",
" if data:\n",
" sys.stdout.write(\"%-s %.3f%% | Zips Done: %d | Results: %s | Timer: %s | Size: %s\" % ('===>', percentage, q, \"y\", timer, size_in_b))\n",
" sys.stdout.flush()\n",
" listname.append(data)\n",
" if write:\n",
" EPA._write_data(name+'.csv', headers, listname)\n",
" else:\n",
" sys.stdout.write(\"%-s %.3f%% | Zips Done: %d | Results: %s | Timer: %s | Size: %s\" % ('===>', percentage, q, \"n\", timer, size_in_b))\n",
" sys.stdout.flush()\n",
" emptylist.append(i)\n",
"\n",
" cur.close()\n",
" conn.close()\n",
"\n",
"# if write:\n",
"# subprocess.call(\"drive upload -f \"+EPA.home+\"/Desktop/\"+_type+\".csv\", shell=True)\n",
"\n",
"# print \"\\nAdded to Google Drive...\"\n",
" print \"__________________________\"\n",
" print \"Captured:\",len(listname), \"| Non-captured:\", len(emptylist)\n",
" percent = float(len(emptylist))/len(listname)*100\n",
" print \"Percentage of captured data: %d\" % round(percent, 3)\n",
" \n",
" \n",
"class HistoricalEPA(object):\n",
" \n",
" \n",
" def get_historical_data(self, path):\n",
" \"\"\"grabs historical daily EPA data from url and downloads them to 'path'\"\"\"\n",
"\n",
" url = 'http://aqsdr1.epa.gov/aqsweb/aqstmp/airdata/download_files.html#Daily'\n",
" r = requests.get(url)\n",
" soup = BeautifulSoup(r.content)\n",
" gdata = soup.find_all('a')\n",
" gdata = [i for i in gdata if re.search('daily', str(i), flags=re.IGNORECASE)]\n",
" links = []\n",
" pb = 1\n",
" starttime = datetime.now()\n",
" for link in gdata:\n",
" baselink = 'http://aqsdr1.epa.gov/aqsweb/aqstmp/airdata/'\n",
"\n",
" sys.stdout.write('\\r')\n",
" percentage = (100.0/len(gdata))*pb\n",
" timer = datetime.now() - starttime\n",
" timer = str(timer)[:-3]\n",
" sys.stdout.write(\"%-s %.3f%% | %s\" % ('=zip=>', percentage, timer))\n",
" sys.stdout.flush()\n",
" pb+=1\n",
"\n",
" if link.get('href').endswith('zip'):\n",
" link = baselink+link.get('href')\n",
" downloadurl = URL(link)\n",
" name_of_file = path+link[link.rfind('/'):]\n",
" f = open(name_of_file, 'wb')\n",
" f.write(downloadurl.download(cached=False))\n",
" f.close()\n",
" links.append(link)\n",
" \n",
" def unzip_historical_data(self, path):\n",
" \"\"\"unzips historical data files that were downloaded with get_historical_data()\"\"\"\n",
"\n",
" os.chdir(path)\n",
" zipped_files = os.listdir(path) \n",
" zipped_files = [i for i in zipped_files if i.endswith('zip')] \n",
" pb = 0\n",
" starttime = datetime.now()\n",
"\n",
" for i in zipped_files:\n",
"\n",
" subprocess.call(\"unzip \"+path+'/'+i, shell=True) \n",
" subprocess.call(\"rm \"+path+'/'+i, shell=True)\n",
"\n",
" pb+=1\n",
" sys.stdout.write('\\r')\n",
" percentage = (100.0/len(zipped_files))*pb\n",
" timer = datetime.now() - starttime\n",
" sys.stdout.write(\"%-s %.3f%% | %s\" % ('=csv=>', percentage, timer))\n",
" sys.stdout.flush()\n",
" \n",
" \n",
"class WriteToDB(object):\n",
" \n",
" home = expanduser(\"~\")\n",
" \n",
" def __init__(self, name):\n",
" self.name = 'daily_42602_2014.csv'\n",
" \n",
" @classmethod\n",
" def homepath(cls):\n",
" return cls.home\n",
" \n",
" @staticmethod\n",
" def get_file(name):\n",
" \"\"\"retrieves the path to the file newzips_with_lat_llong.csv\"\"\"\n",
"\n",
" for root, dirs, files in os.walk(WriteToDB.home):\n",
" if name in files:\n",
" return os.path.join(root, name)\n",
" \n",
" @staticmethod\n",
" def view_rows(iterable, n):\n",
" \"\"\"previews the first n rows of the document\"\"\"\n",
" for i in iterable:\n",
" print i\n",
" print column[i][0:n]\n",
" print\n",
"\n",
"\n",
" def write_historical_data_to_db(self, path):\n",
" \"\"\"writes all historical data to the environment database\"\"\"\n",
"\n",
" tables = ['ozone','so2','co','no2',\n",
" 'pm_frm_fem_mass','pm_nonfrm_fem_mass','pm_mass','pm_speciation',\n",
" 'winds','temperature','baro_pressure','rh_dewpoint',\n",
" 'haps','vocs','lead']\n",
" names = ['44201','42401','42101','42602',\n",
" '88101','88502','81102','spec',\n",
" 'wind','temp','press','rh_dew',\n",
" 'haps','vocs','lead']\n",
" tables_names = dict(zip(tables, names))\n",
" file_ = WriteToDB.get_file('daily_42101_2014.csv')\n",
" with open(file_, 'rb') as zips:\n",
" reader = csv.reader(zips)\n",
" headers = reader.next()\n",
" headers1 = filter(None, headers)\n",
"\n",
" column = {}\n",
" for h in headers1:\n",
" column[h] = []\n",
"\n",
" for row in reader:\n",
" for h, v in zip(headers, row):\n",
" column[h].append(v)\n",
"\n",
" headers = [re.sub(' ','_',x) for x in headers1]\n",
" headers = [re.sub('1st','first',x) for x in headers]\n",
" headers = ', '.join(str(x) for x in headers)\n",
"\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
"\n",
" for table, name in tables_names.iteritems():\n",
" # cur.execute(\"drop table %s;\" % table)\n",
" try:\n",
" cur.execute(\"\"\"create table %s\n",
" (\n",
" id serial,\n",
" year char(4),\n",
" state_code int,\n",
" county_code varchar(10),\n",
" site_num int,\n",
" parameter_code int,\n",
" poc int,\n",
" latitude numeric(10,6),\n",
" longitude numeric(10,6),\n",
" datum varchar(10),\n",
" parameter_name varchar(25),\n",
" sample_duration varchar(40),\n",
" pollutant_standard varchar(35),\n",
" date_local varchar(12),\n",
" units_of_measure varchar(75),\n",
" event_type varchar(30),\n",
" observation_count int,\n",
" observation_percent numeric(6,2),\n",
" arithmetic_mean numeric,\n",
" first_max_value numeric,\n",
" first_max_hour int,\n",
" aqi varchar(10),\n",
" method_name varchar(25),\n",
" local_site_name varchar(40),\n",
" address varchar(150),\n",
" state_name varchar(20),\n",
" county_name varchar(75),\n",
" city_name varchar(100),\n",
" cbsa_name varchar(65),\n",
" date_of_last_change varchar(10)\n",
" );\"\"\" % table)\n",
"\n",
" cur.execute(\"ALTER SEQUENCE %s_id_seq RESTART;\" % table)\n",
" cur.execute(\"comment on table %s is 'historical data';\" % table)\n",
"\n",
" except psycopg2.ProgrammingError as e:\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
" print e\n",
"\n",
" finally:\n",
" conn.commit()\n",
"\n",
" # path = '/volumes/phtim/perceptionhealth/epa_zipped/'\n",
" files = os.listdir(path)\n",
" files = [n for n in files if name in n]\n",
"\n",
" progress = 0\n",
" starttime = datetime.now()\n",
" for i in files:\n",
"\n",
" year = re.findall('_([0-9]{4})\\.', i)[0]\n",
" cur.execute(\"\"\"copy %s (%s) from\n",
" '%s/%s' delimiter ',' csv header;\"\"\" % (table, headers, path, i))\n",
" cur.execute(\"update %s set year=%s where date_local ~* '.*%s';\" % (table, year, year[-2:]))\n",
" conn.commit()\n",
"\n",
" sys.stdout.write('\\r')\n",
" percentage = (100.0/len(files))*progress\n",
" timer = datetime.now() - starttime\n",
" timer = str(timer)[:-3]\n",
" sys.stdout.write(\"%-s %.3f%% | %s | %s\" % ('='+table+'=>', percentage, timer, i))\n",
" sys.stdout.flush()\n",
" progress+=1\n",
"\n",
" cur.close()\n",
" conn.close()\n",
" \n",
"\n",
"class Weather(object):\n",
" \n",
" @staticmethod\n",
" def heat_index(T, R):\n",
" \"\"\"calculates the heat index given temp and humidity; I use the same heat index method as \n",
" these guys: http://wonder.cdc.gov/wonder/help/Climate/ta_htindx.PDF\"\"\"\n",
" try:\n",
" T, R = int(T), int(R)\n",
" c_1 = -42.379\n",
" c_2 = 2.04901523\n",
" c_3 = 10.14333127\n",
" c_4 = -0.22475541\n",
" c_5 = -6.83783*10**(-3)\n",
" c_6 = -5.481717*10**(-2)\n",
" c_7 = 1.22874*10**(-3)\n",
" c_8 = 8.5282*10**(-4)\n",
" c_9 = -1.99*10**(-6)\n",
" HI = c_1+c_2*T + c_3*R + c_4*T*R + c_5*T**2 + c_6*R**2+ c_7*T**2*R + c_8*T*R**2 + c_9*T**2*R**2\n",
" HI = round(HI, 4)\n",
" return HI\n",
" except:\n",
" return 'n/a'\n",
"\n",
" @staticmethod\n",
" def wind_chill(T, V):\n",
" \"\"\"calculates wind chill given temp and wind speed\"\"\"\n",
"\n",
" try:\n",
" T, V = int(T), int(V)\n",
" degrees = -45 < T < 45\n",
" windspeed = 3 < V < 60\n",
" if degrees and windspeed:\n",
" WC = 35.74 + 0.6215*(T) - 35.75*(V**0.16) + 0.4275*T*(V**0.16)\n",
" WC = round(WC, 4)\n",
" else:\n",
" WC = 'n/a'\n",
" return WC\n",
" except:\n",
" return 'n/a'\n",
" \n",
" @staticmethod\n",
" def extract_nums(string, pattern=\"pattern\"):\n",
" N = re.compile('[0-9]')\n",
" if re.search(N, string):\n",
" value = re.findall(pattern, string, flags=re.IGNORECASE)[0]\n",
" else:\n",
" value = 0\n",
" return value\n",
" \n",
" def get_weather_data(self, alerts=None):\n",
" \"\"\"grabs all weather data and writes it to the weather database\"\"\"\n",
" \n",
" subject = 'Weather - get_weather_data()'\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
" \n",
" name = EPA.file_name\n",
" path = EPA.get_path(name)\n",
" zips = EPA.read_zips(path)\n",
" \n",
" progress = 1\n",
" starttime = datetime.now()\n",
" # zips = zips[0:23] -> testing\n",
" weather_list = []\n",
" zips = zips[15708:]\n",
" for row in zips:\n",
" \n",
" current_date = datetime.now().strftime('%m/%d/%y')\n",
" current_time = datetime.now().strftime('%I:%M:%S')\n",
" \n",
" zipcode = row[0]\n",
" lat = row[1][0][0]\n",
" lon = row[1][1][0]\n",
"\n",
" url = 'http://forecast.weather.gov/MapClick.php?lat='+str(lat)+'&lon='+str(lon)+'&site=all&smap=1#.VQggGRDF8Yc'\n",
" r = requests.get(url)\n",
" r = re.sub(r'[^\\x00-\\x7F]+',' ', r.content).encode('ascii', 'xmlcharrefreplace')\n",
" weather_soup = BeautifulSoup(r)\n",
" gdata = weather_soup.find_all('div', {'id':'current-conditions-body'})\n",
" \n",
" sys.stdout.write('\\r')\n",
" percentage = (100.0/len(zips))*progress\n",
" timer = datetime.now() - starttime\n",
" timer = str(timer)[:-3]\n",
" sys.stdout.write(\"%-s %.3f%% | Timer: %s | %s | Zips done: %s\" % ('===>', percentage, timer, zipcode, progress))\n",
" sys.stdout.flush()\n",
" progress += 1\n",
" \n",
" for data in gdata:\n",
" try:\n",
" temp = data.find_all('p', {'class':'myforecast-current-lrg'})\n",
" temp = re.sub('[^0-9]', '', str(temp)) if re.search('[0-9]', str(temp)) else 0\n",
"\n",
" weather_data = data.find_all('tr')\n",
" weather_data = [i.text.encode('utf-8').strip() for i in weather_data]\n",
" weather_data = [re.sub('\\s{1,}', ' ', i) for i in weather_data]\n",
" \n",
" humidity = Weather.extract_nums(weather_data[0], pattern='[0-9]{1,}')\n",
" wind_speed = Weather.extract_nums(weather_data[1], pattern='([0-9]{1,}) mph')\n",
" barom = Weather.extract_nums(weather_data[2], pattern='[0-9]{1,} in')\n",
" dewpoint = Weather.extract_nums(weather_data[3], pattern='[0-9]{1,}')\n",
" visibility = Weather.extract_nums(weather_data[4], pattern='[0-9]{1,}.{1,} mi')\n",
" last_update = re.search('[0-9]{1,2} [A-Za-z]{3} [0-9]{1,2}:[0-9]{2}', weather_data[-1]).group()\n",
"\n",
" heat_index = Weather.heat_index(temp, humidity)\n",
" wind_chill = Weather.wind_chill(temp, wind_speed)\n",
" \n",
" if alerts:\n",
" alert = weather_soup.find_all('a', {'id':'anchor-hazards'})\n",
" alert = 1 if alert else 0\n",
" else:\n",
" alert = '00'\n",
" \n",
" cur.execute(\"\"\"insert into weather_data \n",
" (\n",
" zipcode, temperature, humidity, wind_speed, barometer,\n",
" dewpoint, visibility, heat_index, wind_chill, alerts, currentdate,\n",
" currenttime, last_update\n",
" )\n",
" values \n",
" (%s, %s, %s, '%s', '%s', %s, '%s', %s, '%s', '%s', '%s', '%s', '%s');\n",
" \"\"\" % (zipcode, temp, humidity, wind_speed, barom,\n",
" dewpoint, visibility, heat_index, wind_chill, alert, \n",
" current_date, current_time, last_update))\n",
" \n",
" weather_list.append([zipcode, temp, humidity, wind_speed, barom,\n",
" dewpoint, visibility, heat_index, wind_chill, alert, \n",
" current_date, current_time, last_update])\n",
"\n",
" except AttributeError as e:\n",
" print e\n",
" except psycopg2.DataError as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, row])\n",
" except psycopg2.ProgrammingError as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, row])\n",
" except requests.exceptions.Timeout as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, row])\n",
" except requests.exceptions.ConnectionError as e:\n",
" print e\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, row])\n",
" except Exception as e:\n",
" print e\n",
" \n",
" finally:\n",
" conn.commit()\n",
" \n",
" cur.close()\n",
" conn.close()\n",
"\n",
"\n",
"class HistoricalWeather(object):\n",
" \n",
" def __init__(self, path):\n",
" self.path = path\n",
"\n",
" @staticmethod\n",
" def get_next_values(string, data, plus):\n",
" previous = None\n",
" next = None\n",
" for index, obj in enumerate(data):\n",
" if obj == string:\n",
" previous = data[index - 1]\n",
" next = data[index + plus]\n",
" return next\n",
" \n",
" @staticmethod\n",
" def find_station_id(zipcode):\n",
" url = 'http://www.wunderground.com/cgi-bin/findweather/getForecast?query='+zipcode\n",
" page = requests.get(url)\n",
" tree = html.fromstring(page.text)\n",
" station = tree.xpath('//code/text()')[0].split()[1]\n",
" return station\n",
" \n",
" def get_station_id(self):\n",
" \"\"\"path is the CSV file you want to read from - newzips_with_lat_llong.csv\"\"\"\n",
" zips = Utility.read_csv(path)\n",
" zips_with_station = []\n",
" no_station = []\n",
" starttime = datetime.now()\n",
" progress = 1\n",
" headers = ['zipcode','minlat','maxlat','minlon','maxlon','ID']\n",
" no_headers = ['zipcode','lat','lon']\n",
" name = '/users/tdobbins/desktop/newzips_with_staton_id.csv'\n",
" error_name = '/users/tdobbins/desktop/no_station_id.csv'\n",
" for i in zips:\n",
" zipcode = row[0]\n",
" miny = row[1]\n",
" minx = row[2]\n",
" maxy = row[3]\n",
" maxx = row[4]\n",
" try:\n",
" url = 'http://www.wunderground.com/cgi-bin/findweather/getForecast?query='+zipcode\n",
" page = requests.get(url)\n",
" tree = html.fromstring(page.text)\n",
" station = tree.xpath('//code/text()')[0].split()[1]\n",
" zips_with_station.append([zipcode, miny, minx, maxy, maxx, station])\n",
" with open(name, 'w') as f:\n",
" writer = csv.writer(f, delimiter=',', quoting=csv.QUOTE_ALL)\n",
" [writer.writerow(headers)]\n",
" [writer.writerow(x) for x in zips_with_station]\n",
" Utility(zips).probar(progress, starttime)\n",
" progress += 1\n",
" except Exception as e:\n",
" no_station.append([zipcode, lat, lon])\n",
" with open(error_name, 'w') as f:\n",
" writer = csv.writer(f, delimiter=',', quoting=csv.QUOTE_ALL)\n",
" [writer.writerow(no_headers)]\n",
" [writer.writerow(x) for x in no_station]\n",
" print e, zipcode\n",
" \n",
" def index_it(self, n):\n",
" zips = Utility.read_csv(self.path, col=\"zipcode\")\n",
" return zips.index(str(n))\n",
" \n",
" def get_historical_weather_data(self, n):\n",
"\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
" subject = 'HistoricalWeather - get_historical_weather_data()'\n",
" \n",
" zips = Utility.read_csv(self.path)\n",
" zips = zips[n:]\n",
" for code in zips:\n",
" zipcode = code[0]\n",
" minlat = code[1]\n",
" maxlat = code[2]\n",
" minlon = code[3]\n",
" maxlon = code[4]\n",
" station_id = code[5]\n",
"\n",
" for year in range(2014, 2015):\n",
" for month in range(1, 13):\n",
" for day in range(1, 32):\n",
"\n",
" # Check if leap year\n",
" if year%400 == 0:\n",
" leap = True\n",
" elif year%100 == 0:\n",
" leap = False\n",
" elif year%4 == 0:\n",
" leap = True\n",
" else:\n",
" leap = False\n",
"\n",
" # Check if already gone through month\n",
" if (month == 2 and leap and day > 29):\n",
" continue\n",
" elif (month == 2 and day > 28):\n",
" continue\n",
" elif (month in [4, 6, 9, 10] and day > 30):\n",
" continue\n",
" \n",
" id_ = 1\n",
" sys.stdout.write('\\r')\n",
" sys.stdout.write(str(month)+'/'+str(day)+'/'+str(year)+' | zipcode: '+zipcode+' ')\n",
" sys.stdout.flush()\n",
"# station = HistoricalWeather.find_station(zipcode) no need to hit it twice. i grabbed all station_ids, cutting hits in half\n",
" url = \"http://www.wunderground.com/history/airport/\"+station_id+\"/\"+str(year)+ \"/\" + str(month) + \"/\" + str(day) + \"/DailyHistory.html&reqdb.zip\"+zipcode\n",
" page = requests.get(url)\n",
" tree = html.fromstring(page.text)\n",
" temp_dew_wind = tree.xpath('//span/text()')\n",
" temp_dew_wind = [re.sub('[^\\x00-\\x7F]*|\\s', '', i).strip().encode() for i in temp_dew_wind]\n",
" temp_dew_wind = filter(None, temp_dew_wind)\n",
" moisture = tree.xpath('//td/text()')\n",
" moisture = [re.sub('[^\\x00-\\x7F]*|\\s', '', i).strip().encode() for i in moisture]\n",
" moisture = filter(None, moisture)\n",
"\n",
" temp = HistoricalWeather.get_next_values('Max Temperature',temp_dew_wind, 1)\n",
" dewpoint = HistoricalWeather.get_next_values('Dew Point',temp_dew_wind , 1) \n",
" wind_speed = HistoricalWeather.get_next_values('Wind Speed',temp_dew_wind , 1)\n",
" humidity = HistoricalWeather.get_next_values('Moisture',moisture , 2)\n",
"\n",
" HI = Weather.heat_index(temp, humidity)\n",
" WC = Weather.wind_chill(temp, wind_speed)\n",
" thedate = str(month)+'/'+str(day)+'/'+str(year)\n",
" \n",
" try:\n",
" cur.execute(\"\"\"insert into historical_weather \n",
" (\n",
" zipcode, temperature, heat_index, wind_chill, thedate\n",
" )\n",
" values\n",
" (%s, '%s', '%s', '%s', '%s');\n",
" \"\"\" % (zipcode, temp, HI, WC, thedate))\n",
" \n",
" except psycopg2.Error as e:\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, code])\n",
" print e, id_\n",
" connect_str = 'dbname=environment host=localhost port=5432'\n",
" conn = psycopg2.connect(connect_str)\n",
" cur = conn.cursor()\n",
" \n",
" except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, code])\n",
" print e, id_\n",
" except Exception as e:\n",
" Utility.send_email(EPA.TO, EPA.FROM, subject, [e, code])\n",
" print e, id_\n",
" \n",
" finally:\n",
" conn.commit()\n",
" id_ += 1\n",
" \n",
" cur.close()\n",
" conn.close()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# using a headless browser\n",
"\n",
"import re\n",
"import mechanize\n",
"from selenium import webdriver\n",
"import time\n",
"import requests\n",
"from bs4 import BeautifulSoup\n",
"\n",
"class EPAdata(object):\n",
" \"\"\"EPAdata class\"\"\"\n",
" \n",
" def __init__(self, zip_code):\n",
" \"\"\"returns an EPAdata object and fires up the PhantonJS driver\"\"\"\n",
" \n",
" self.zip_code = zip_code\n",
" self.driver = webdriver.PhantomJS()\n",
" \n",
" def _gdata(self, tag, attr, value):\n",
" \"\"\"visits the given URL and, after redirection, retrieves the desired URL\"\"\"\n",
" \n",
" url = 'http://www.epa.gov/myenv/myenview2.find?zipcode='+str(self.zip_code)+'&GO=go'\n",
" self.driver.get(url)\n",
"\n",
" while url == self.driver.current_url:\n",
" time.sleep(0)\n",
"\n",
" desired_url = self.driver.current_url\n",
" self.driver.get(desired_url)\n",
" r = requests.get(desired_url)\n",
" soup = BeautifulSoup(r.content)\n",
" gen_data = soup.find_all(tag,{attr:value})\n",
" return gen_data\n",
" \n",
" def _getlink(self, gen_data, regex):\n",
" \"\"\"finds the link containing the desired data\"\"\"\n",
"\n",
" for i in gen_data:\n",
" for l in i('a'):\n",
" base = 'http://www.epa.gov/myenv/'\n",
"\n",
" if re.search(regex, l.get('href')):\n",
" link = re.sub(' ','',base+l.get('href'))\n",
" return link\n",
"\n",
" def _gdata2(self, link, tag, attr, value):\n",
" \"\"\"navigates to the link containing the desired data, parses through the iframes\n",
" and returns the snippet HTML with the desired data and passes it on for further scraping\"\"\"\n",
"\n",
" self.driver.get(link)\n",
" self.driver.execute_script(\"window.scrollTo(0, document.body.scrollHeight);\")\n",
" iframes = self.driver.find_elements_by_tag_name(\"iframe\")\n",
"\n",
" for iframe in iframes:\n",
" self.driver.switch_to_default_content()\n",
" self.driver.switch_to_frame(iframe)\n",
" output = self.driver.page_source\n",
" soup = BeautifulSoup(output)\n",
" moredata = soup.find_all(tag,{attr:value})\n",
" if moredata:\n",
" yield moredata\n",
" else:\n",
" continue\n",
" \n",
" \n",
" def aqi(self):\n",
" \"\"\"opens webdriver to given url and then gets the desired url after\n",
" redirection, continues to parse the redirected url; returns realtime air quality index.\"\"\"\n",
" \n",
" gen_data = self._gdata('div', 'class', 'col2')\n",
" link = self._getlink(gen_data, 'MyAir')\n",
" moredata = self._gdata2(link, 'span', 'id', 'airnowCurr')\n",
" for item in moredata:\n",
" try:\n",
" item = BeautifulSoup(str(item))\n",
" current_pm = item.find_all('td',{'valign':'middle'})[0].text.encode('utf-8')\n",
" return {\"Current PM2.5\": current_pm}\n",
" except IndexError as e:\n",
" pass\n",
" \n",
" self.driver.close()\n",
" \n",
" def water(self):\n",
" \"\"\"opens webdriver to given url and then gets the desired url after\n",
" redirection, continues to parse the redirected url; returns water quality data.\"\"\"\n",
" \n",
" gen_data = self._gdata('div', 'class', 'col3')\n",
" link = self._getlink(gen_data, 'MyWater')\n",
" moredata = self._gdata2(link, 'table', 'style', 'background-color: white;')\n",
"\n",
" for item in moredata:\n",
" if item:\n",
" item = BeautifulSoup(str(item))\n",
" water_data = [i.text.strip().encode('utf-8') for i in \n",
" item.find_all('tr',{'class':re.compile('no1|yes1')})]\n",
" \n",
" keys = []\n",
" values = []\n",
" for i in water_data:\n",
" k = i.split()[:4] if len(i.split()) > 4 else i.split()[:-1]\n",
" v = i.split()[-1] if 'Not Assessed' not in i else ' '.join(i.split()[-2:])\n",
" keys.append(k)\n",
" values.append(v)\n",
" keys = [' '.join(i) for i in keys]\n",
" mydict = {re.sub('Cause', '', k):v for k,v in zip(keys, values)}\n",
" mydict['ZIP CODE'] = self.zip_code\n",
" return mydict\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
},
"widgets": {
"state": {},
"version": "1.1.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment