Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created April 17, 2011 15:54
Show Gist options
  • Save robcowie/924155 to your computer and use it in GitHub Desktop.
Save robcowie/924155 to your computer and use it in GitHub Desktop.
Pull latest readings from Met Office marine stations
# -*- coding: utf-8 -*-
"""
# Pull latest 24 hour data every x hours
# 46 locations, 402,960 data points per year
marine.find({'location':'aberporth'})
marine.find({'location':'aberporth', 'day':19, 'month':3, 'year':2011})
marine.find({'location':'aberporth', 'datetime': datetime(19, 3, 2011, 21, 0)})
marine.find({'datetime': datetime(19, 3, 2011, 21, 0)})
"""
keywords = "uk marine weather met office mongodb"
import csv
from datetime import datetime
from functools import partial
import urllib2
from lxml import etree
from pprint import pprint
import pymongo
MET_OFFICE_HOST = 'http://www.metoffice.gov.uk'
STATION_TYPES = {1: 'buoy', 2: 'buoy', 3: 'buoy', 4: 'vessel',
5: 'island', 6: 'land', 7: 'other'}
DATA_HEADER = ('station', 'date', 'time', 'weather', 'air_temp', 'dew_point',
'sea_temp', 'humidity', 'wind_direction', 'wind_speed',
'visibility', 'air_pressure', 'wave_height', 'wave_period')
def coerce_to(conv, val):
val = val.strip()
try:
val = conv(val) if val else None
except ValueError:
#TODO: log this; Pass logger instance to to_int()
val = None
return val
to_int = partial(coerce_to, int)
to_float = partial(coerce_to, float)
class HeadRequest(urllib2.Request):
def get_method(self):
return "HEAD"
def to_csv_url(html_url):
"""Return url to .csv given a .html url"""
path = urllib2.splithost(urllib2.splittype(html_url)[1])[1]
path = path.replace('_table.html', '.csv')
return ''.join([MET_OFFICE_HOST, path])
def get_sources():
##TODO: use urllib2 to check etag
src_url = 'http://www.metoffice.gov.uk/weather/marine/observations/locations.xml'
tree = etree.parse(src_url)
root = tree.getroot()
for elem in root:
yield dict(
name=elem.get('name'),
type=int(elem.get('type')),
url=to_csv_url( elem.get('redirect') ),
location=(float(elem.get('lat')), float(elem.get('lng')))
)
def get_data(source):
"""For a source, try to retrieve data. Add file-like data object to the
source dict
"""
src_url = source['url']
ts = datetime.now()
data = dict(date=ts)
try:
response = urllib2.urlopen(HeadRequest(src_url))
##TODO: handle etag/last-modified caching
# return urllib2.urlopen(src_url)
data.update(dict(
data=parse_data(urllib2.urlopen(src_url)),
last_modified=response.headers['last-modified'],
etag=response.headers['etag'],
date=response.headers['date']
))
source['data'] = data
except urllib2.HTTPError:
source['data'] = data
finally:
return source
def parse_data(data_file):
"""Expects a file-like object containing csv formatted data, with header
row. Returns an generator that yields dicts.
"""
#TODO: Check headers are as expected, else critical error
def processor(row):
row['date'] = datetime.strptime(row['date'], '%d %b %Y')
row['air_temp'] = to_float(row['air_temp'])
row['dew_point'] = to_float(row['dew_point'])
row['sea_temp'] = to_float(row['sea_temp'])
row['humidity'] = to_float(row['humidity'])
row['wind_direction'] = to_int(row['wind_direction'])
row['wind_speed'] = to_int(row['wind_speed'])
row['visibility'] = to_int(row['visibility'])
row['air_pressure'] = to_float(row['air_pressure'])
row['wave_height'] = to_float(row['wave_height'])
row['wave_period'] = to_int(row['wave_period'])
row['datetime'] = datetime(
year=row['date'].year,
month=row['date'].month,
day=row['date'].day,
hour=int(row['time'][:2]),
minute=0)
del row['station']
del row['time']
del row['date']
return row
## Advance the data file one line to ignore headers
data_file.readline()
rdr = csv.DictReader(data_file, fieldnames=DATA_HEADER)
return (processor(row) for row in rdr)
def save_and_merge(source, collection):
"""Given an iterable of hourly readings for a location, save
unseen readings to mongo
"""
if not 'data' in source['data']:
return source
src = source['data']['data']
## For each entry (hourly reading), upsert
for row in src:
spec = {'datetime': row['datetime'], 'location.name': source['name']}
row['location'] = dict(name=source['name'], type=source['type'], position=source['location'])
doc = {'$set': row}
collection.update(spec, doc, upsert=True, safe=True, multi=False)
return source
def index(coll):
coll.create_index([('location.name', 1), ('location.type', 1), ('datetime', pymongo.DESCENDING)])
coll.create_index([('location.position', pymongo.GEO2D)])
def pretty_location(source):
print source['name']
if not 'data' in source['data']:
print ' No data available at %s' % source['data']['date']
else:
print ' Got data'
if __name__ == '__main__':
mongoconn = pymongo.Connection('127.0.0.1')
db = mongoconn.weather
marine_readings = db.marine
data = get_sources()
data = (get_data(source) for source in data)
for source in data:
pretty_location(source)
save_and_merge(source, marine_readings)
index(marine_readings)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment