Skip to content

Instantly share code, notes, and snippets.

@larsbutler
Last active December 15, 2015 22:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save larsbutler/5330230 to your computer and use it in GitHub Desktop.
Save larsbutler/5330230 to your computer and use it in GitHub Desktop.
Quantile hazard curve post-processing, for SHARE
#!/usr/bin/env python
# Copyright (c) 2013, GEM Foundation.
#
# This is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import argparse
import decimal
import numpy
import os
import sys
from collections import OrderedDict
from itertools import izip
from lxml import etree
#: Maps XML writer constructor keywords to XML attribute names
_ATTR_MAP = OrderedDict([
('statistics', 'statistics'),
('quantile_value', 'quantileValue'),
('smlt_path', 'sourceModelTreePath'),
('gsimlt_path', 'gsimTreePath'),
('imt', 'IMT'),
('investigation_time', 'investigationTime'),
('sa_period', 'saPeriod'),
('sa_damping', 'saDamping'),
('poe', 'poE'),
('lon', 'lon'),
('lat', 'lat'),
])
def weighted_quantile_curve(curves, weights, quantile):
"""
Compute the weighted quantile aggregate of a set of curves. This method is
used in the case where hazard curves are computed using the logic tree
end-branch enumeration approach. In this case, the weights are explicit.
:param curves:
2D array-like of curve PoEs. Each row represents the PoEs for a single
curve
:param weights:
Array-like of weights, 1 for each input curve.
:param quantile:
Quantile value to calculate. Should in the range [0.0, 1.0].
:returns:
A numpy array representing the quantile aggregate of the input
``curves`` and ``quantile``, weighting each curve with the specified
``weights``.
"""
# Each curve needs to be associated with a weight:
assert len(weights) == len(curves)
# NOTE(LB): Weights might be passed as a list of `decimal.Decimal`
# types, and numpy.interp can't handle this (it throws TypeErrors).
# So we explicitly cast to floats here before doing interpolation.
weights = numpy.array(weights, dtype=numpy.float64)
result_curve = []
np_curves = numpy.array(curves)
np_weights = numpy.array(weights)
for poes in np_curves.transpose():
sorted_poe_idxs = numpy.argsort(poes)
sorted_weights = np_weights[sorted_poe_idxs]
sorted_poes = poes[sorted_poe_idxs]
# cumulative sum of weights:
cum_weights = numpy.cumsum(sorted_weights)
result_curve.append(numpy.interp(quantile, cum_weights, sorted_poes))
return numpy.array(result_curve)
def get_output_filenames(quantiles):
"""
:param quantiles:
List of quantile levels.
"""
filename_fmt = 'qq.%s.xml'
return [filename_fmt % q for q in quantiles]
NAMESPACE = 'http://openquake.org/xmlns/nrml/0.4'
GML_NAMESPACE = 'http://www.opengis.net/gml'
NS_MAP = {'nrml': NAMESPACE, 'gml': GML_NAMESPACE}
class HazardCurveReader(object):
def __init__(self, path):
self.path = path
self.imls = None
self.imt = None
self.sa_period = None
self.sa_damping = 5.0
self.investigation_time = None
self.tree = etree.iterparse(self.path, events=('start', 'end'))
# cache the IMT/IMLs in the reader first
for event, elem in self.tree:
if event == 'start' and elem.tag == '{%s}hazardCurves' % NAMESPACE:
self.imt = elem.attrib.get('IMT')
self.sa_period = elem.attrib.get('saPeriod')
self.investigation_time = elem.attrib.get('investigationTime')
break
else:
raise RuntimeError("No <hazardCurves> element found in '%s'"
% self.path)
for event, elem in self.tree:
if event == 'end':
if elem.tag == '{%s}IMLs' % NAMESPACE:
self.imls = [float(x) for x in elem.text.split()]
break
else:
raise RuntimeError("No IMLs found!")
def read(self):
"""
Return a generator of 2-tuples (site_string, hazard_curve_poes).
"""
for event, elem in self.tree:
if event == 'end' and elem.tag == '{%s}hazardCurve' % NAMESPACE:
# NOTE: Keep the site as a string, always
[site_elem] = elem.xpath('./gml:Point/gml:pos',
namespaces=NS_MAP)
site = site_elem.text.strip()
[poes_elem] = elem.xpath('./nrml:poEs', namespaces=NS_MAP)
poes = [float(x) for x in poes_elem.text.split()]
yield site, poes
# clear up memory once the <hazardCurve> element is processed
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
def _validate_hazard_metadata(md):
"""
Validate metadata `dict` of attributes, which are more or less the same for
hazard curves, hazard maps, and disaggregation histograms.
:param dict md:
`dict` which can contain the following keys:
* statistics
* gsimlt_path
* smlt_path
* imt
* sa_period
* sa_damping
:raises:
:exc:`ValueError` if the metadata is not valid.
"""
if (md.get('statistics') is not None
and (md.get('smlt_path') is not None
or md.get('gsimlt_path') is not None)):
raise ValueError('Cannot specify both `statistics` and logic tree '
'paths')
if md.get('statistics') is not None:
# make sure only valid statistics types are specified
if md.get('statistics') not in ('mean', 'quantile'):
raise ValueError('`statistics` must be either `mean` or '
'`quantile`')
else:
# must specify both logic tree paths
if md.get('smlt_path') is None or md.get('gsimlt_path') is None:
raise ValueError('Both logic tree paths are required for '
'non-statistical results')
if md.get('statistics') == 'quantile':
if md.get('quantile_value') is None:
raise ValueError('quantile stastics results require a quantile'
' value to be specified')
if not md.get('statistics') == 'quantile':
if md.get('quantile_value') is not None:
raise ValueError('Quantile value must be specified with '
'quantile statistics')
if md.get('imt') == 'SA':
if md.get('sa_period') is None:
raise ValueError('`sa_period` is required for IMT == `SA`')
if md.get('sa_damping') is None:
raise ValueError('`sa_damping` is required for IMT == `SA`')
def _set_metadata(element, metadata, attr_map, transform=str):
"""
Set metadata attributes on a given ``element``.
:param element:
:class:`lxml.etree._Element` instance
:param metadata:
Dictionary of metadata items containing attribute data for ``element``.
:param attr_map:
Dictionary mapping of metadata key->attribute name.
:param transform:
A function accepting and returning a single value to be applied to each
attribute value. Defaults to `str`.
"""
for kw, attr in attr_map.iteritems():
value = metadata.get(kw)
if value is not None:
element.set(attr, transform(value))
SERIALIZE_NS_MAP = {None: NAMESPACE, 'gml': GML_NAMESPACE}
class HazardCurveXMLWriter(object):
"""
Hazard Curve XML writer. See :class:`BaseCurveXMLWriter` for a list of
general constructor inputs.
:param path:
File path (including filename) for XML results to be saved to.
:param metadata:
The following keyword args are required:
* investigation_time: Investigation time (in years) defined in the
calculation which produced these results.
The following are more or less optional (combinational rules noted
below where applicable):
* statistics: 'mean' or 'quantile'
* quantile_value: Only required if statistics = 'quantile'.
* smlt_path: String representing the logic tree path which produced
these curves. Only required for non-statistical curves.
* gsimlt_path: String represeting the GSIM logic tree path which
produced these curves. Only required for non-statisical curves.
The following additional metadata params are required:
* imt: Intensity measure type used to compute these hazard curves.
* imls: Intensity measure levels, which represent the x-axis values of
each curve.
The following parameters are optional:
* sa_period: Only used with imt = 'SA'.
* sa_damping: Only used with imt = 'SA'.
"""
def __init__(self, path, **metadata):
self.path = path
self.metadata = metadata
_validate_hazard_metadata(metadata)
self.root = None
self.hazard_curves = None # container element
def prepare(self):
self.root = etree.Element(
'nrml',
nsmap=SERIALIZE_NS_MAP
)
self.hazard_curves = etree.SubElement(self.root, 'hazardCurves')
_set_metadata(self.hazard_curves, self.metadata, _ATTR_MAP)
imls_elem = etree.SubElement(self.hazard_curves, 'IMLs')
imls_elem.text = ' '.join([str(x) for x in self.metadata['imls']])
def write_node(self, site, poes):
"""
:param site: A string representing the lon and lat. For example:
'45.6 18.2'
:param poes:: A list of probability of exceedence values (floats).
"""
hc_elem = etree.SubElement(self.hazard_curves, 'hazardCurve')
gml_point = etree.SubElement(hc_elem, '{%s}Point' % GML_NAMESPACE)
gml_pos = etree.SubElement(gml_point, '{%s}pos' % GML_NAMESPACE)
gml_pos.text = site
poes_elem = etree.SubElement(hc_elem, 'poEs')
poes_elem.text = ' '.join([str(x) for x in poes])
def close(self):
with open(self.path, 'w') as fh:
fh.write(etree.tostring(
self.root, pretty_print=True, xml_declaration=True,
encoding='UTF-8'))
def set_up_arg_parser():
parser = argparse.ArgumentParser(
description='Quantile post-processing tool for SHARE'
)
gen_grp = parser.add_argument_group('General')
gen_grp.add_argument(
'--quantiles', '-q',
metavar='QUANTILES',
help=('Comma separated list of quantile values. Example: '
'--quantiles=0.05,0.15,0.5,0.85.0.95')
)
gen_grp.add_argument(
'--in-dir', '-i',
metavar='INPUT_DIR',
help='Directory where input is located. Should contain IMT subfolders.'
)
gen_grp.add_argument(
'--out-dir', '-o',
metavar='OUTPUT_DIR',
help=('Directory where output should be saved. Subfolders will be '
'created for each IMT')
)
return parser
def make_dirs(target_dir):
print "Making dir: %s" % target_dir
if os.path.exists(target_dir):
if not os.path.isdir(target_dir):
# If it's not a directory, we can't do anything.
# This is a problem
raise RuntimeError('%s already exists and is not a directory.'
% target_dir)
else:
os.makedirs(target_dir)
def __validate_imls_imts(readers):
head_reader = readers[0]
tail_readers = readers[1:]
if not all([x.imls == head_reader.imls for x in tail_readers]):
raise RuntimeError('IMLs for hazard curves in dir "%s" do not match'
% inf)
if not all([x.imt == head_reader.imt for x in tail_readers]):
raise RuntimeError('IMTs for hazard curves in dir "%s" do not match'
% inf)
if not all([x.sa_period == head_reader.sa_period for x in tail_readers]):
raise RuntimeError('SA Periods are not uniform in dir "%s"' % inf)
def do_quantiles(in_folders, out_folders, quantiles):
for inf, outf in izip(in_folders, out_folders):
# for each dir, compute quantiles from all the curves we find
# for all quantile levels
input_files = (x for x in os.listdir(inf)
if x.lower().endswith('.xml'))
input_files = [os.path.join(inf, x) for x in input_files]
weights = [x.lower().split('_')[-1].split('.xml')[0]
for x in input_files]
weights = [decimal.Decimal(x) for x in weights]
if not sum(weights) == decimal.Decimal('1.0'):
raise RuntimeError('Weights do not sum to 1.')
readers = [HazardCurveReader(path) for path in input_files]
__validate_imls_imts(readers)
gens = [x.read() for x in readers]
q_writers = []
metadata = {
'statistics': 'quantile',
'sa_period': readers[0].sa_period,
'sa_damping': readers[0].sa_damping,
'imt': readers[0].imt,
'imls': readers[0].imls,
'investigation_time': readers[0].investigation_time,
}
for q in quantiles:
metadata['quantile_value'] = q
q_file = os.path.join(outf, 'qq.%s.xml' % q)
writer = HazardCurveXMLWriter(q_file, **metadata)
writer.prepare()
q_writers.append(writer)
def curves_gen():
while True:
try:
curve_set = (g.next() for g in gens)
curve_set = list(curve_set)
if not curve_set:
# That's the last of the curves
# We're done
break
sites, curve_poes = zip(*curve_set)
if not len(set(sites)) == 1:
raise RuntimeError('Sites are not uniform. '
'This input is bad.')
yield [(q, (sites[0],
weighted_quantile_curve(curve_poes,
weights,
q)\
.tolist())
)
for q in quantiles]
except StopIteration:
break
for site_cnt, data_per_location in enumerate(curves_gen()):
print 'site %s processed' % site_cnt
for i, q in enumerate(quantiles):
the_writer = q_writers[i]
assert q == the_writer.metadata['quantile_value']
the_writer.write_node(data_per_location[i][1][0],
data_per_location[i][1][1])
# close the writers
for the_writer in q_writers:
the_writer.close()
print 'Making file: %s' % the_writer.path
if __name__ == "__main__":
arg_parser = set_up_arg_parser()
args = arg_parser.parse_args()
if None in (args.quantiles, args.in_dir, args.out_dir):
arg_parser.print_usage()
sys.exit(0)
quantiles = [float(x) for x in args.quantiles.split(',')]
base_dir = args.in_dir
contents = os.listdir(base_dir)
# IMT input subfolders
in_folders = [os.path.join(base_dir, x) for x in contents
if os.path.isdir(os.path.join(base_dir, x))]
out_folders = [os.path.join(args.out_dir, os.path.basename(sf))
for sf in in_folders]
for of in out_folders:
make_dirs(of)
do_quantiles(in_folders, out_folders, quantiles)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment