Skip to content

Instantly share code, notes, and snippets.

@dylanvee
Created July 6, 2011 23:26
Show Gist options
  • Save dylanvee/1068587 to your computer and use it in GitHub Desktop.
Save dylanvee/1068587 to your computer and use it in GitHub Desktop.
mysql-ctypes test case
#!/usr/local/bin/python
# ushcn_to_mysql.py - A script for importing USHCN data into MySQL
# Copyright (c) 2010, Center for Embedded Networked Sensing (CENS).
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of CENS nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL CENS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Example function for finding the ID of the nearest weather station
# Modified from slide 8 of http://www.arubin.org/files/geo_search.pdf
# def find_nearest_station_id(cursor, latitude, longitude):
# cursor.execute("""SELECT *, 3956 * 2 * ASIN(SQRT(POWER(SIN((%s - abs(X(station.location))) *
# pi()/180 / 2), 2) + COS(%s * pi()/180 ) * COS(abs(X(station.location)) *
# pi()/180) * POWER(SIN((%s - Y(station.location)) * pi()/180 / 2), 2) )) as distance
# FROM USHCN_Stations station
# ORDER BY distance limit 1;""", (latitude, latitude, longitude))
# result = cursor.fetchone()
# return result[0]
import argparse
import MySQLdb
import sys
import atexit
import tempfile
import os
import urllib
import gzip
def get_args():
parser = parser = argparse.ArgumentParser(description='\
Imports temperature data from the United States Historical Climatology Network into a MySQL database.')
parser.add_argument('--host', type=str, help='MySQL hostname or IP address')
parser.add_argument('--port', type=int, help='MySQL port')
parser.add_argument('--user', type=str, help='MySQL user')
parser.add_argument('--passwd', type=str, help='MySQL password')
parser.add_argument('--db', type=str, required=True, help='MySQL database')
parser.add_argument('--drop', type=str, help='Whether to drop existing tables, if necessary')
parsed = parser.parse_args()
args = {
'host':parsed.host,
'port':parsed.port,
'user':parsed.user,
'passwd':parsed.passwd,
'db':parsed.db,
'drop':parsed.drop
}
for key in args.keys():
if not args[key]:
del args[key]
return args
def drop_tables(cursor):
cursor.execute('DROP TABLE IF EXISTS `USHCN_Measurements`;')
cursor.execute('DROP TABLE IF EXISTS `USHCN_Source_Flags`;')
cursor.execute('DROP TABLE IF EXISTS `USHCN_Quality_Flags`;')
cursor.execute('DROP TABLE IF EXISTS `USHCN_Measurement_Flags`;')
cursor.execute('DROP TABLE IF EXISTS `USHCN_Stations`;')
cursor.execute('DROP TABLE IF EXISTS `USHCN_States`;')
def create_tables(cursor):
cursor.execute("""CREATE TABLE `USHCN_States` (
`id` char(2) NOT NULL COMMENT 'The first two digits of each station ID is a state ID.',
`code` char(2) NOT NULL COMMENT 'Standard two-character abbreviation.',
`name` varchar(14) NOT NULL COMMENT 'Full state name.',
PRIMARY KEY (`id`),
UNIQUE KEY `code` (`code`),
UNIQUE KEY `name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
cursor.execute("""CREATE TABLE `USHCN_Stations` (
`id` char(6) NOT NULL COMMENT 'U.S. Cooperative Observer Network station identification code. The first two digits of each station ID is a state ID.',
`location` point NOT NULL COMMENT 'The latitude and longitude of the station.',
`elevation` decimal(5,1) COMMENT 'The elevation of the station, in meters.',
`state_code` char(2) NOT NULL COMMENT 'The state where the station is located.',
`name` varchar(30) NOT NULL COMMENT 'The name of the station location.',
`component_1` char(6) COMMENT 'The station ID for the first station (if applicable; in chronologic order) whose records were joined with those of the HCN site to form a longer time series.',
`component_2` char(6) COMMENT 'The station ID for the second station (if applicable; in chronologic order) whose records were joined with those of the HCN site to form a longer time series.',
`component_3` char(6) COMMENT 'The station ID for the third station (if applicable; in chronologic order) whose records were joined with those of the HCN site to form a longer time series.',
`utc_offset` int NOT NULL COMMENT 'The time difference between Coordinated Universal Time (UTC) and local standard time at the station (i.e., the number of hours that must be added to local standard time to match UTC).',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
cursor.execute("""CREATE TABLE `USHCN_Measurement_Flags` (
`flag` char(1) NOT NULL COMMENT 'Measurement flag.',
`meaning` text NOT NULL COMMENT 'Measurement flag meaning.',
PRIMARY KEY (`flag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
cursor.execute("""CREATE TABLE `USHCN_Quality_Flags` (
`flag` char(1) NOT NULL COMMENT 'Quality flag.',
`meaning` text NOT NULL COMMENT 'Quality flag meaning.',
PRIMARY KEY (`flag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
cursor.execute("""CREATE TABLE `USHCN_Source_Flags` (
`flag` char(1) NOT NULL COMMENT 'Source flag.',
`meaning` text NOT NULL COMMENT 'Source flag meaning.',
PRIMARY KEY (`flag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
cursor.execute("""CREATE TABLE `USHCN_Measurements` (
`date` date NOT NULL COMMENT 'The measurement date.',
`station_id` char(6) NOT NULL COMMENT 'U.S. Cooperative Observer Network station identification code.',
`temp_min` int COMMENT 'Minimum temperature, in degrees Fahrenheit.',
`temp_max` int COMMENT 'Maximum temperature, in degrees Fahrenheit.',
`temp_min_mflag` char(1) COMMENT 'Measurement flag for temp_min.',
`temp_min_qflag` char(1) COMMENT 'Quality flag for temp_min.',
`temp_min_sflag` char(1) COMMENT 'Source flag for temp_min.',
`temp_max_mflag` char(1) COMMENT 'Measurement flag for temp_max.',
`temp_max_qflag` char(1) COMMENT 'Quality flag for temp_max.',
`temp_max_sflag` char(1) COMMENT 'Source flag for temp_max.',
PRIMARY KEY (`date`,`station_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;""")
def import_state_data(db, cursor):
cursor.execute("""INSERT INTO `USHCN_States` (`id`, `code`, `name`) VALUES
('01','AL','Alabama'),
('02','AZ','Arizona'),
('03','AR','Arkansas'),
('04','CA','California'),
('05','CO','Colorado'),
('06','CT','Connecticut'),
('07','DE','Delaware'),
('08','FL','Florida'),
('09','GA','Georgia'),
('10','ID','Idaho'),
('11','IL','Illinois'),
('12','IN','Indiana'),
('13','IA','Iowa'),
('14','KS','Kansas'),
('15','KY','Kentucky'),
('16','LA','Louisiana'),
('17','ME','Maine'),
('18','MD','Maryland'),
('19','MA','Massachusetts'),
('20','MI','Michigan'),
('21','MN','Minnesota'),
('22','MS','Mississippi'),
('23','MO','Missouri'),
('24','MT','Montana'),
('25','NE','Nebraska'),
('26','NV','Nevada'),
('27','NH','New Hampshire'),
('28','NJ','New Jersey'),
('29','NM','New Mexico'),
('30','NY','New York'),
('31','NC','North Carolina'),
('32','ND','North Dakota'),
('33','OH','Ohio'),
('34','OK','Oklahoma'),
('35','OR','Oregon'),
('36','PA','Pennsylvania'),
('37','RI','Rhode Island'),
('38','SC','South Carolina'),
('39','SD','South Dakota'),
('40','TN','Tennessee'),
('41','TX','Texas'),
('42','UT','Utah'),
('43','VT','Vermont'),
('44','VA','Virginia'),
('45','WA','Washington'),
('46','WV','West Virginia'),
('47','WI','Wisconsin'),
('48','WY','Wyoming');""")
db.commit()
def import_station_data(db, cursor):
station_file_url = 'http://cdiac.ornl.gov/ftp/ushcn_daily/ushcn-stations.txt'
station_file_name = station_file_url.split('/')[-1]
cached_path = os.path.join(tempfile.gettempdir(), station_file_name)
if not os.path.exists(cached_path):
urllib.urlretrieve(station_file_url, cached_path)
with open(cached_path) as station_file:
part_names = ['id', 'lat', 'lon', 'elevation', 'state_code', 'name', 'component_1', 'component_2', 'component_3', 'utc_offset']
column_names = ['id', 'location', 'elevation', 'state_code', 'name', 'component_1', 'component_2', 'component_3', 'utc_offset']
for line in station_file:
parts_list = line.split()
parts_list = parts_list[:5] + [' '.join(parts_list[5:len(parts_list)-4])] + parts_list[-4:]
parts = dict(zip(part_names, parts_list))
if parts['elevation'] == '-999.9':
parts['elevation'] = None
for part in ['component_1', 'component_2', 'component_3']:
if parts[part] == '------':
parts[part] = None
parts['location'] = 'POINT(%s %s)' % (parts['lat'], parts['lon'])
cursor.execute("""INSERT INTO `USHCN_Stations`
(`id`, `location`, `elevation`, `state_code`, `name`, `component_1`, `component_2`, `component_3`, `utc_offset`)
VALUES (%s, GeomFromText(%s), %s, %s, %s, %s, %s, %s, %s)""", [parts[column_name] for column_name in column_names])
db.commit()
def import_annotation_data(db, cursor):
cursor.execute("""INSERT INTO `USHCN_Measurement_Flags` (`flag`, `meaning`) VALUES
('B','precipitation total formed from two 12-hour totals'),
('D','precipitation total formed from four six-hour totals'),
('L','temperature appears to be lagged with respect to reported hour of observation'),
('T','trace of precipitation, snowfall, or snow depth');""")
cursor.execute("""INSERT INTO `USHCN_Quality_Flags` (`flag`, `meaning`) VALUES
('A','failed accumulation total check'),
('D','failed duplicate check'),
('G','failed gap check'),
('I','failed internal consistency check'),
('K','failed streak/frequent-value check'),
('M','failed megaconsistency check'),
('N','failed naught check'),
('O','failed climatological outlier check'),
('R','failed lagged range check'),
('S','failed spatial consistency check'),
('T','failed temporal consistency check'),
('W','temperature too warm for snow'),
('X','failed bounds check');""")
cursor.execute("""INSERT INTO `USHCN_Source_Flags` (`flag`, `meaning`) VALUES
('0','U.S. Cooperative Summary of the Day (NCDC DSI-3200)'),
('1','U.S. Preliminary Cooperative Summary of the Day -- Transmitted '),
('2','U.S. Preliminary Cooperative Summary of the Day -- Keyed from paper forms'),
('6','CDMP Cooperative Summary of the Day (NCDC DSI-3206)'),
('A','U.S. Automated Surface Observing System (ASOS) real-time data (since January 1, 2006)'),
('B','U.S. ASOS data for October 2000-December 2005 (NCDC DSI-3211)'),
('F','U.S. Fort data'),
('G','Official Global Climate Observing System (GCOS) or other government-supplied data'),
('H','High Plains Regional Climate Center real-time data'),
('I','International collection (non U.S. data received through personal contacts)'),
('M','Monthly METAR Extract (additional ASOS data)'),
('Q','Data from several African countries that had been \"quarantined\", that is, withheld from public release until permission was granted from the respective meteorological services'),
('R','NCDC Reference Network Database (Climate Reference Network and Historical Climatology Network-Modernized)'),
('S','Global Summary of the Day (NCDC DSI-9618) NOTE: \"S\" values are derived from hourly synoptic reports exchanged on the Global Telecommunications System (GTS). Daily values derived in this fashion may differ significantly from \"true\" daily data, particularly for precipitation (i.e., use with caution).'),
('X','U.S. First-Order Summary of the Day (NCDC DSI-3210)');""")
db.commit()
def import_climate_data(db, cursor):
climate_file_url = 'http://cdiac.ornl.gov/ftp/ushcn_daily/us.txt.gz'
climate_file_name = climate_file_url.split('/')[-1]
cached_path = os.path.join(tempfile.gettempdir(), climate_file_name)
if not os.path.exists(cached_path):
urllib.urlretrieve(climate_file_url, cached_path)
with gzip.open(cached_path) as climate_file:
prefixes = {'PRCP':'precip', 'SNOW':'snowfall', 'SNWD':'snowdepth', 'TMAX':'temp_max', 'TMIN':'temp_min'}
encountered_stations = []
for line in climate_file:
element = prefixes[line[12:16]]
if element not in ['temp_min', 'temp_max']:
continue
year = line[6:10]
if int(year) < 2007:
continue
station_id = line[0:6]
if station_id not in encountered_stations:
print 'Importing data from station %s' % station_id
encountered_stations.append(station_id)
month = line[10:12]
month_to_insert = []
custom_range = range(16, 264, 8)
for i in custom_range:
value = line[i:i+5].lstrip()
mflag = line[i+5:i+6]
qflag = line[i+6:i+7]
sflag = line[i+7:i+8]
if value == '-9999':
value = None
if mflag == ' ':
mflag = None
if qflag == ' ':
qflag = None
if sflag == ' ':
sflag = None
day_str = str(custom_range.index(i)+1)
if len(day_str) == 1:
day_str = '0' + day_str
date = '%s-%s-%s' % (year, month, day_str)
row = (date, station_id, value, mflag, qflag, sflag)
month_to_insert.append(row)
statement_prefix = 'INSERT INTO `USHCN_Measurements` (`date`, `station_id`, `%s`, `%s_mflag`, `%s_qflag`, `%s_sflag`)' % ((element,)*4)
value_update = '\'' + value + '\'' if value else 'NULL'
mflag_update = '\'' + mflag + '\'' if mflag else 'NULL'
qflag_update = '\'' + qflag + '\'' if qflag else 'NULL'
sflag_update = '\'' + sflag + '\'' if sflag else 'NULL'
statement_suffix = 'ON DUPLICATE KEY UPDATE %s = %s, %s_mflag = %s, %s_qflag = %s, %s_sflag = %s;' \
% (element, value_update, element, mflag_update, element, qflag_update, element, sflag_update)
cursor.executemany(statement_prefix + ' VALUES (%s, %s, %s, %s, %s, %s) ' + statement_suffix, month_to_insert)
db.commit()
def clean_up(db, cursor):
cursor.close()
db.close()
def main():
print 'Connecting to MySQL'
args = get_args()
drop = False
if args.has_key('drop'):
drop = args['drop'] in ['true', 'True']
del args['drop']
db = MySQLdb.connect(**args)
cursor = db.cursor()
atexit.register(clean_up, db, cursor)
if drop:
print 'Dropping tables'
drop_tables(cursor)
print 'Creating tables'
create_tables(cursor)
print 'Importing state data'
import_state_data(db, cursor)
print 'Importing station data'
import_station_data(db, cursor)
print 'Importing annotation data'
import_annotation_data(db, cursor)
print 'Importing climate data'
import_climate_data(db, cursor)
print 'Done'
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment