Last active
December 15, 2015 22:00
-
-
Save larsbutler/5330230 to your computer and use it in GitHub Desktop.
Quantile hazard curve post-processing, for SHARE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (c) 2013, GEM Foundation. | |
# | |
# This is free software: you can redistribute it and/or modify it | |
# under the terms of the GNU Affero General Public License as published | |
# by the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This software is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU Affero General Public License | |
# along with this software. If not, see <http://www.gnu.org/licenses/>. | |
import argparse | |
import decimal | |
import numpy | |
import os | |
import sys | |
from collections import OrderedDict | |
from itertools import izip | |
from lxml import etree | |
#: Maps XML writer constructor keywords to XML attribute names | |
_ATTR_MAP = OrderedDict([ | |
('statistics', 'statistics'), | |
('quantile_value', 'quantileValue'), | |
('smlt_path', 'sourceModelTreePath'), | |
('gsimlt_path', 'gsimTreePath'), | |
('imt', 'IMT'), | |
('investigation_time', 'investigationTime'), | |
('sa_period', 'saPeriod'), | |
('sa_damping', 'saDamping'), | |
('poe', 'poE'), | |
('lon', 'lon'), | |
('lat', 'lat'), | |
]) | |
def weighted_quantile_curve(curves, weights, quantile): | |
""" | |
Compute the weighted quantile aggregate of a set of curves. This method is | |
used in the case where hazard curves are computed using the logic tree | |
end-branch enumeration approach. In this case, the weights are explicit. | |
:param curves: | |
2D array-like of curve PoEs. Each row represents the PoEs for a single | |
curve | |
:param weights: | |
Array-like of weights, 1 for each input curve. | |
:param quantile: | |
Quantile value to calculate. Should in the range [0.0, 1.0]. | |
:returns: | |
A numpy array representing the quantile aggregate of the input | |
``curves`` and ``quantile``, weighting each curve with the specified | |
``weights``. | |
""" | |
# Each curve needs to be associated with a weight: | |
assert len(weights) == len(curves) | |
# NOTE(LB): Weights might be passed as a list of `decimal.Decimal` | |
# types, and numpy.interp can't handle this (it throws TypeErrors). | |
# So we explicitly cast to floats here before doing interpolation. | |
weights = numpy.array(weights, dtype=numpy.float64) | |
result_curve = [] | |
np_curves = numpy.array(curves) | |
np_weights = numpy.array(weights) | |
for poes in np_curves.transpose(): | |
sorted_poe_idxs = numpy.argsort(poes) | |
sorted_weights = np_weights[sorted_poe_idxs] | |
sorted_poes = poes[sorted_poe_idxs] | |
# cumulative sum of weights: | |
cum_weights = numpy.cumsum(sorted_weights) | |
result_curve.append(numpy.interp(quantile, cum_weights, sorted_poes)) | |
return numpy.array(result_curve) | |
def get_output_filenames(quantiles): | |
""" | |
:param quantiles: | |
List of quantile levels. | |
""" | |
filename_fmt = 'qq.%s.xml' | |
return [filename_fmt % q for q in quantiles] | |
NAMESPACE = 'http://openquake.org/xmlns/nrml/0.4' | |
GML_NAMESPACE = 'http://www.opengis.net/gml' | |
NS_MAP = {'nrml': NAMESPACE, 'gml': GML_NAMESPACE} | |
class HazardCurveReader(object): | |
def __init__(self, path): | |
self.path = path | |
self.imls = None | |
self.imt = None | |
self.sa_period = None | |
self.sa_damping = 5.0 | |
self.investigation_time = None | |
self.tree = etree.iterparse(self.path, events=('start', 'end')) | |
# cache the IMT/IMLs in the reader first | |
for event, elem in self.tree: | |
if event == 'start' and elem.tag == '{%s}hazardCurves' % NAMESPACE: | |
self.imt = elem.attrib.get('IMT') | |
self.sa_period = elem.attrib.get('saPeriod') | |
self.investigation_time = elem.attrib.get('investigationTime') | |
break | |
else: | |
raise RuntimeError("No <hazardCurves> element found in '%s'" | |
% self.path) | |
for event, elem in self.tree: | |
if event == 'end': | |
if elem.tag == '{%s}IMLs' % NAMESPACE: | |
self.imls = [float(x) for x in elem.text.split()] | |
break | |
else: | |
raise RuntimeError("No IMLs found!") | |
def read(self): | |
""" | |
Return a generator of 2-tuples (site_string, hazard_curve_poes). | |
""" | |
for event, elem in self.tree: | |
if event == 'end' and elem.tag == '{%s}hazardCurve' % NAMESPACE: | |
# NOTE: Keep the site as a string, always | |
[site_elem] = elem.xpath('./gml:Point/gml:pos', | |
namespaces=NS_MAP) | |
site = site_elem.text.strip() | |
[poes_elem] = elem.xpath('./nrml:poEs', namespaces=NS_MAP) | |
poes = [float(x) for x in poes_elem.text.split()] | |
yield site, poes | |
# clear up memory once the <hazardCurve> element is processed | |
elem.clear() | |
while elem.getprevious() is not None: | |
del elem.getparent()[0] | |
def _validate_hazard_metadata(md): | |
""" | |
Validate metadata `dict` of attributes, which are more or less the same for | |
hazard curves, hazard maps, and disaggregation histograms. | |
:param dict md: | |
`dict` which can contain the following keys: | |
* statistics | |
* gsimlt_path | |
* smlt_path | |
* imt | |
* sa_period | |
* sa_damping | |
:raises: | |
:exc:`ValueError` if the metadata is not valid. | |
""" | |
if (md.get('statistics') is not None | |
and (md.get('smlt_path') is not None | |
or md.get('gsimlt_path') is not None)): | |
raise ValueError('Cannot specify both `statistics` and logic tree ' | |
'paths') | |
if md.get('statistics') is not None: | |
# make sure only valid statistics types are specified | |
if md.get('statistics') not in ('mean', 'quantile'): | |
raise ValueError('`statistics` must be either `mean` or ' | |
'`quantile`') | |
else: | |
# must specify both logic tree paths | |
if md.get('smlt_path') is None or md.get('gsimlt_path') is None: | |
raise ValueError('Both logic tree paths are required for ' | |
'non-statistical results') | |
if md.get('statistics') == 'quantile': | |
if md.get('quantile_value') is None: | |
raise ValueError('quantile stastics results require a quantile' | |
' value to be specified') | |
if not md.get('statistics') == 'quantile': | |
if md.get('quantile_value') is not None: | |
raise ValueError('Quantile value must be specified with ' | |
'quantile statistics') | |
if md.get('imt') == 'SA': | |
if md.get('sa_period') is None: | |
raise ValueError('`sa_period` is required for IMT == `SA`') | |
if md.get('sa_damping') is None: | |
raise ValueError('`sa_damping` is required for IMT == `SA`') | |
def _set_metadata(element, metadata, attr_map, transform=str): | |
""" | |
Set metadata attributes on a given ``element``. | |
:param element: | |
:class:`lxml.etree._Element` instance | |
:param metadata: | |
Dictionary of metadata items containing attribute data for ``element``. | |
:param attr_map: | |
Dictionary mapping of metadata key->attribute name. | |
:param transform: | |
A function accepting and returning a single value to be applied to each | |
attribute value. Defaults to `str`. | |
""" | |
for kw, attr in attr_map.iteritems(): | |
value = metadata.get(kw) | |
if value is not None: | |
element.set(attr, transform(value)) | |
SERIALIZE_NS_MAP = {None: NAMESPACE, 'gml': GML_NAMESPACE} | |
class HazardCurveXMLWriter(object): | |
""" | |
Hazard Curve XML writer. See :class:`BaseCurveXMLWriter` for a list of | |
general constructor inputs. | |
:param path: | |
File path (including filename) for XML results to be saved to. | |
:param metadata: | |
The following keyword args are required: | |
* investigation_time: Investigation time (in years) defined in the | |
calculation which produced these results. | |
The following are more or less optional (combinational rules noted | |
below where applicable): | |
* statistics: 'mean' or 'quantile' | |
* quantile_value: Only required if statistics = 'quantile'. | |
* smlt_path: String representing the logic tree path which produced | |
these curves. Only required for non-statistical curves. | |
* gsimlt_path: String represeting the GSIM logic tree path which | |
produced these curves. Only required for non-statisical curves. | |
The following additional metadata params are required: | |
* imt: Intensity measure type used to compute these hazard curves. | |
* imls: Intensity measure levels, which represent the x-axis values of | |
each curve. | |
The following parameters are optional: | |
* sa_period: Only used with imt = 'SA'. | |
* sa_damping: Only used with imt = 'SA'. | |
""" | |
def __init__(self, path, **metadata): | |
self.path = path | |
self.metadata = metadata | |
_validate_hazard_metadata(metadata) | |
self.root = None | |
self.hazard_curves = None # container element | |
def prepare(self): | |
self.root = etree.Element( | |
'nrml', | |
nsmap=SERIALIZE_NS_MAP | |
) | |
self.hazard_curves = etree.SubElement(self.root, 'hazardCurves') | |
_set_metadata(self.hazard_curves, self.metadata, _ATTR_MAP) | |
imls_elem = etree.SubElement(self.hazard_curves, 'IMLs') | |
imls_elem.text = ' '.join([str(x) for x in self.metadata['imls']]) | |
def write_node(self, site, poes): | |
""" | |
:param site: A string representing the lon and lat. For example: | |
'45.6 18.2' | |
:param poes:: A list of probability of exceedence values (floats). | |
""" | |
hc_elem = etree.SubElement(self.hazard_curves, 'hazardCurve') | |
gml_point = etree.SubElement(hc_elem, '{%s}Point' % GML_NAMESPACE) | |
gml_pos = etree.SubElement(gml_point, '{%s}pos' % GML_NAMESPACE) | |
gml_pos.text = site | |
poes_elem = etree.SubElement(hc_elem, 'poEs') | |
poes_elem.text = ' '.join([str(x) for x in poes]) | |
def close(self): | |
with open(self.path, 'w') as fh: | |
fh.write(etree.tostring( | |
self.root, pretty_print=True, xml_declaration=True, | |
encoding='UTF-8')) | |
def set_up_arg_parser(): | |
parser = argparse.ArgumentParser( | |
description='Quantile post-processing tool for SHARE' | |
) | |
gen_grp = parser.add_argument_group('General') | |
gen_grp.add_argument( | |
'--quantiles', '-q', | |
metavar='QUANTILES', | |
help=('Comma separated list of quantile values. Example: ' | |
'--quantiles=0.05,0.15,0.5,0.85.0.95') | |
) | |
gen_grp.add_argument( | |
'--in-dir', '-i', | |
metavar='INPUT_DIR', | |
help='Directory where input is located. Should contain IMT subfolders.' | |
) | |
gen_grp.add_argument( | |
'--out-dir', '-o', | |
metavar='OUTPUT_DIR', | |
help=('Directory where output should be saved. Subfolders will be ' | |
'created for each IMT') | |
) | |
return parser | |
def make_dirs(target_dir): | |
print "Making dir: %s" % target_dir | |
if os.path.exists(target_dir): | |
if not os.path.isdir(target_dir): | |
# If it's not a directory, we can't do anything. | |
# This is a problem | |
raise RuntimeError('%s already exists and is not a directory.' | |
% target_dir) | |
else: | |
os.makedirs(target_dir) | |
def __validate_imls_imts(readers): | |
head_reader = readers[0] | |
tail_readers = readers[1:] | |
if not all([x.imls == head_reader.imls for x in tail_readers]): | |
raise RuntimeError('IMLs for hazard curves in dir "%s" do not match' | |
% inf) | |
if not all([x.imt == head_reader.imt for x in tail_readers]): | |
raise RuntimeError('IMTs for hazard curves in dir "%s" do not match' | |
% inf) | |
if not all([x.sa_period == head_reader.sa_period for x in tail_readers]): | |
raise RuntimeError('SA Periods are not uniform in dir "%s"' % inf) | |
def do_quantiles(in_folders, out_folders, quantiles): | |
for inf, outf in izip(in_folders, out_folders): | |
# for each dir, compute quantiles from all the curves we find | |
# for all quantile levels | |
input_files = (x for x in os.listdir(inf) | |
if x.lower().endswith('.xml')) | |
input_files = [os.path.join(inf, x) for x in input_files] | |
weights = [x.lower().split('_')[-1].split('.xml')[0] | |
for x in input_files] | |
weights = [decimal.Decimal(x) for x in weights] | |
if not sum(weights) == decimal.Decimal('1.0'): | |
raise RuntimeError('Weights do not sum to 1.') | |
readers = [HazardCurveReader(path) for path in input_files] | |
__validate_imls_imts(readers) | |
gens = [x.read() for x in readers] | |
q_writers = [] | |
metadata = { | |
'statistics': 'quantile', | |
'sa_period': readers[0].sa_period, | |
'sa_damping': readers[0].sa_damping, | |
'imt': readers[0].imt, | |
'imls': readers[0].imls, | |
'investigation_time': readers[0].investigation_time, | |
} | |
for q in quantiles: | |
metadata['quantile_value'] = q | |
q_file = os.path.join(outf, 'qq.%s.xml' % q) | |
writer = HazardCurveXMLWriter(q_file, **metadata) | |
writer.prepare() | |
q_writers.append(writer) | |
def curves_gen(): | |
while True: | |
try: | |
curve_set = (g.next() for g in gens) | |
curve_set = list(curve_set) | |
if not curve_set: | |
# That's the last of the curves | |
# We're done | |
break | |
sites, curve_poes = zip(*curve_set) | |
if not len(set(sites)) == 1: | |
raise RuntimeError('Sites are not uniform. ' | |
'This input is bad.') | |
yield [(q, (sites[0], | |
weighted_quantile_curve(curve_poes, | |
weights, | |
q)\ | |
.tolist()) | |
) | |
for q in quantiles] | |
except StopIteration: | |
break | |
for site_cnt, data_per_location in enumerate(curves_gen()): | |
print 'site %s processed' % site_cnt | |
for i, q in enumerate(quantiles): | |
the_writer = q_writers[i] | |
assert q == the_writer.metadata['quantile_value'] | |
the_writer.write_node(data_per_location[i][1][0], | |
data_per_location[i][1][1]) | |
# close the writers | |
for the_writer in q_writers: | |
the_writer.close() | |
print 'Making file: %s' % the_writer.path | |
if __name__ == "__main__": | |
arg_parser = set_up_arg_parser() | |
args = arg_parser.parse_args() | |
if None in (args.quantiles, args.in_dir, args.out_dir): | |
arg_parser.print_usage() | |
sys.exit(0) | |
quantiles = [float(x) for x in args.quantiles.split(',')] | |
base_dir = args.in_dir | |
contents = os.listdir(base_dir) | |
# IMT input subfolders | |
in_folders = [os.path.join(base_dir, x) for x in contents | |
if os.path.isdir(os.path.join(base_dir, x))] | |
out_folders = [os.path.join(args.out_dir, os.path.basename(sf)) | |
for sf in in_folders] | |
for of in out_folders: | |
make_dirs(of) | |
do_quantiles(in_folders, out_folders, quantiles) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment