Skip to content

Instantly share code, notes, and snippets.

@alanbernstein
Created July 15, 2017 04:35
Show Gist options
  • Save alanbernstein/f23ffee60854083bfb1f9a70e23f110a to your computer and use it in GitHub Desktop.
Save alanbernstein/f23ffee60854083bfb1f9a70e23f110a to your computer and use it in GitHub Desktop.
collect and cache historical wunderground data as json files
import os
import json
import requests
import time
from datetime import datetime as dt
from datetime import timedelta
CACHE_BASE = os.getenv('CACHE') # output directory
def main():
wc = WunderCli()
start_date = dt(2009, 1, 1)
end_date = dt(2015, 12, 31)
date = end_date
while date > start_date:
called = wc.get_history(date)
date -= timedelta(days=1)
if called:
time.sleep(6)
class WunderCli(object):
default_location = 'nyc'
default_api_key = os.getenv('WUNDERGROUND_KEY', None)
type_map = {
'daily': 'forecast/',
'extended': 'forecast10day/',
'now': 'conditions/alerts/',
'hourly': 'hourly/',
'history': 'history_%s/',
}
location_map = {
'austin': 'TX/Austin',
'nyc': 'NY/New_York',
None: 'autoip'
}
# cache params
time_format = '%Y%m%d'
cachefile_base = CACHE_BASE + '/wunderground-history/'
daily_cachefile_pattern = cachefile_base + 'daily-%s-%s.json'
force_refresh = False
def __init__(self, location=None, api_key=None):
self.location = location or self.default_location
self.api_key = api_key or self.default_api_key
def get_history(self, date):
"""
http://api.wunderground.com/api/0fc966316827d764/history_20120101/q/NY/New_York.json
"""
cachefile = self.daily_cachefile_pattern % (self.location, dt.strftime(date, self.time_format))
print(cachefile)
if os.path.exists(cachefile):
print(' found in cache!')
with open(cachefile, 'r') as f:
data = json.load(f)
called_api = False
else:
print(' retrieving from api')
data = self.api_call(self.location, 'history', date)
with open(cachefile, 'w') as f:
json.dump(data, f)
called_api = True
return called_api
def api_call(self, location, forecast_type, date=None):
base_url = 'http://api.wunderground.com/api/%s/' % self.api_key
forecast_str = self.type_map[forecast_type]
if forecast_type == 'history':
forecast_str = self.type_map[forecast_type] % dt.strftime(date, self.time_format)
url = base_url + forecast_str + 'q/%s.json' % self.location_map[location]
print(' %s' % url)
resp = requests.get(url)
resp_json = json.loads(resp.content)
if 'response' in resp_json and 'error' in resp_json['response'] and 'description' in resp_json['response']:
raise ApiError(resp_json['description'])
return resp_json
class ApiError(Exception):
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment