Created
April 17, 2011 15:54
-
-
Save robcowie/924155 to your computer and use it in GitHub Desktop.
Pull latest readings from Met Office marine stations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
# Pull latest 24 hour data every x hours | |
# 46 locations, 402,960 data points per year | |
marine.find({'location':'aberporth'}) | |
marine.find({'location':'aberporth', 'day':19, 'month':3, 'year':2011}) | |
marine.find({'location':'aberporth', 'datetime': datetime(19, 3, 2011, 21, 0)}) | |
marine.find({'datetime': datetime(19, 3, 2011, 21, 0)}) | |
""" | |
keywords = "uk marine weather met office mongodb" | |
import csv | |
from datetime import datetime | |
from functools import partial | |
import urllib2 | |
from lxml import etree | |
from pprint import pprint | |
import pymongo | |
MET_OFFICE_HOST = 'http://www.metoffice.gov.uk' | |
STATION_TYPES = {1: 'buoy', 2: 'buoy', 3: 'buoy', 4: 'vessel', | |
5: 'island', 6: 'land', 7: 'other'} | |
DATA_HEADER = ('station', 'date', 'time', 'weather', 'air_temp', 'dew_point', | |
'sea_temp', 'humidity', 'wind_direction', 'wind_speed', | |
'visibility', 'air_pressure', 'wave_height', 'wave_period') | |
def coerce_to(conv, val): | |
val = val.strip() | |
try: | |
val = conv(val) if val else None | |
except ValueError: | |
#TODO: log this; Pass logger instance to to_int() | |
val = None | |
return val | |
to_int = partial(coerce_to, int) | |
to_float = partial(coerce_to, float) | |
class HeadRequest(urllib2.Request): | |
def get_method(self): | |
return "HEAD" | |
def to_csv_url(html_url): | |
"""Return url to .csv given a .html url""" | |
path = urllib2.splithost(urllib2.splittype(html_url)[1])[1] | |
path = path.replace('_table.html', '.csv') | |
return ''.join([MET_OFFICE_HOST, path]) | |
def get_sources(): | |
##TODO: use urllib2 to check etag | |
src_url = 'http://www.metoffice.gov.uk/weather/marine/observations/locations.xml' | |
tree = etree.parse(src_url) | |
root = tree.getroot() | |
for elem in root: | |
yield dict( | |
name=elem.get('name'), | |
type=int(elem.get('type')), | |
url=to_csv_url( elem.get('redirect') ), | |
location=(float(elem.get('lat')), float(elem.get('lng'))) | |
) | |
def get_data(source): | |
"""For a source, try to retrieve data. Add file-like data object to the | |
source dict | |
""" | |
src_url = source['url'] | |
ts = datetime.now() | |
data = dict(date=ts) | |
try: | |
response = urllib2.urlopen(HeadRequest(src_url)) | |
##TODO: handle etag/last-modified caching | |
# return urllib2.urlopen(src_url) | |
data.update(dict( | |
data=parse_data(urllib2.urlopen(src_url)), | |
last_modified=response.headers['last-modified'], | |
etag=response.headers['etag'], | |
date=response.headers['date'] | |
)) | |
source['data'] = data | |
except urllib2.HTTPError: | |
source['data'] = data | |
finally: | |
return source | |
def parse_data(data_file): | |
"""Expects a file-like object containing csv formatted data, with header | |
row. Returns an generator that yields dicts. | |
""" | |
#TODO: Check headers are as expected, else critical error | |
def processor(row): | |
row['date'] = datetime.strptime(row['date'], '%d %b %Y') | |
row['air_temp'] = to_float(row['air_temp']) | |
row['dew_point'] = to_float(row['dew_point']) | |
row['sea_temp'] = to_float(row['sea_temp']) | |
row['humidity'] = to_float(row['humidity']) | |
row['wind_direction'] = to_int(row['wind_direction']) | |
row['wind_speed'] = to_int(row['wind_speed']) | |
row['visibility'] = to_int(row['visibility']) | |
row['air_pressure'] = to_float(row['air_pressure']) | |
row['wave_height'] = to_float(row['wave_height']) | |
row['wave_period'] = to_int(row['wave_period']) | |
row['datetime'] = datetime( | |
year=row['date'].year, | |
month=row['date'].month, | |
day=row['date'].day, | |
hour=int(row['time'][:2]), | |
minute=0) | |
del row['station'] | |
del row['time'] | |
del row['date'] | |
return row | |
## Advance the data file one line to ignore headers | |
data_file.readline() | |
rdr = csv.DictReader(data_file, fieldnames=DATA_HEADER) | |
return (processor(row) for row in rdr) | |
def save_and_merge(source, collection): | |
"""Given an iterable of hourly readings for a location, save | |
unseen readings to mongo | |
""" | |
if not 'data' in source['data']: | |
return source | |
src = source['data']['data'] | |
## For each entry (hourly reading), upsert | |
for row in src: | |
spec = {'datetime': row['datetime'], 'location.name': source['name']} | |
row['location'] = dict(name=source['name'], type=source['type'], position=source['location']) | |
doc = {'$set': row} | |
collection.update(spec, doc, upsert=True, safe=True, multi=False) | |
return source | |
def index(coll): | |
coll.create_index([('location.name', 1), ('location.type', 1), ('datetime', pymongo.DESCENDING)]) | |
coll.create_index([('location.position', pymongo.GEO2D)]) | |
def pretty_location(source): | |
print source['name'] | |
if not 'data' in source['data']: | |
print ' No data available at %s' % source['data']['date'] | |
else: | |
print ' Got data' | |
if __name__ == '__main__': | |
mongoconn = pymongo.Connection('127.0.0.1') | |
db = mongoconn.weather | |
marine_readings = db.marine | |
data = get_sources() | |
data = (get_data(source) for source in data) | |
for source in data: | |
pretty_location(source) | |
save_and_merge(source, marine_readings) | |
index(marine_readings) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment