Create a gist now

Instantly share code, notes, and snippets.

Quantile hazard curve post-processing, for SHARE
#!/usr/bin/env python
# Copyright (c) 2013, GEM Foundation.
# This is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this software. If not, see <>.
import argparse
import decimal
import numpy
import os
import sys
from collections import OrderedDict
from itertools import izip
from lxml import etree
#: Maps XML writer constructor keywords to XML attribute names
_ATTR_MAP = OrderedDict([
('statistics', 'statistics'),
('quantile_value', 'quantileValue'),
('smlt_path', 'sourceModelTreePath'),
('gsimlt_path', 'gsimTreePath'),
('imt', 'IMT'),
('investigation_time', 'investigationTime'),
('sa_period', 'saPeriod'),
('sa_damping', 'saDamping'),
('poe', 'poE'),
('lon', 'lon'),
('lat', 'lat'),
def weighted_quantile_curve(curves, weights, quantile):
Compute the weighted quantile aggregate of a set of curves. This method is
used in the case where hazard curves are computed using the logic tree
end-branch enumeration approach. In this case, the weights are explicit.
:param curves:
2D array-like of curve PoEs. Each row represents the PoEs for a single
:param weights:
Array-like of weights, 1 for each input curve.
:param quantile:
Quantile value to calculate. Should in the range [0.0, 1.0].
A numpy array representing the quantile aggregate of the input
``curves`` and ``quantile``, weighting each curve with the specified
# Each curve needs to be associated with a weight:
assert len(weights) == len(curves)
# NOTE(LB): Weights might be passed as a list of `decimal.Decimal`
# types, and numpy.interp can't handle this (it throws TypeErrors).
# So we explicitly cast to floats here before doing interpolation.
weights = numpy.array(weights, dtype=numpy.float64)
result_curve = []
np_curves = numpy.array(curves)
np_weights = numpy.array(weights)
for poes in np_curves.transpose():
sorted_poe_idxs = numpy.argsort(poes)
sorted_weights = np_weights[sorted_poe_idxs]
sorted_poes = poes[sorted_poe_idxs]
# cumulative sum of weights:
cum_weights = numpy.cumsum(sorted_weights)
result_curve.append(numpy.interp(quantile, cum_weights, sorted_poes))
return numpy.array(result_curve)
def get_output_filenames(quantiles):
:param quantiles:
List of quantile levels.
filename_fmt = 'qq.%s.xml'
return [filename_fmt % q for q in quantiles]
class HazardCurveReader(object):
def __init__(self, path):
self.path = path
self.imls = None
self.imt = None
self.sa_period = None
self.sa_damping = 5.0
self.investigation_time = None
self.tree = etree.iterparse(self.path, events=('start', 'end'))
# cache the IMT/IMLs in the reader first
for event, elem in self.tree:
if event == 'start' and elem.tag == '{%s}hazardCurves' % NAMESPACE:
self.imt = elem.attrib.get('IMT')
self.sa_period = elem.attrib.get('saPeriod')
self.investigation_time = elem.attrib.get('investigationTime')
raise RuntimeError("No <hazardCurves> element found in '%s'"
% self.path)
for event, elem in self.tree:
if event == 'end':
if elem.tag == '{%s}IMLs' % NAMESPACE:
self.imls = [float(x) for x in elem.text.split()]
raise RuntimeError("No IMLs found!")
def read(self):
Return a generator of 2-tuples (site_string, hazard_curve_poes).
for event, elem in self.tree:
if event == 'end' and elem.tag == '{%s}hazardCurve' % NAMESPACE:
# NOTE: Keep the site as a string, always
[site_elem] = elem.xpath('./gml:Point/gml:pos',
site = site_elem.text.strip()
[poes_elem] = elem.xpath('./nrml:poEs', namespaces=NS_MAP)
poes = [float(x) for x in poes_elem.text.split()]
yield site, poes
# clear up memory once the <hazardCurve> element is processed
while elem.getprevious() is not None:
del elem.getparent()[0]
def _validate_hazard_metadata(md):
Validate metadata `dict` of attributes, which are more or less the same for
hazard curves, hazard maps, and disaggregation histograms.
:param dict md:
`dict` which can contain the following keys:
* statistics
* gsimlt_path
* smlt_path
* imt
* sa_period
* sa_damping
:exc:`ValueError` if the metadata is not valid.
if (md.get('statistics') is not None
and (md.get('smlt_path') is not None
or md.get('gsimlt_path') is not None)):
raise ValueError('Cannot specify both `statistics` and logic tree '
if md.get('statistics') is not None:
# make sure only valid statistics types are specified
if md.get('statistics') not in ('mean', 'quantile'):
raise ValueError('`statistics` must be either `mean` or '
# must specify both logic tree paths
if md.get('smlt_path') is None or md.get('gsimlt_path') is None:
raise ValueError('Both logic tree paths are required for '
'non-statistical results')
if md.get('statistics') == 'quantile':
if md.get('quantile_value') is None:
raise ValueError('quantile stastics results require a quantile'
' value to be specified')
if not md.get('statistics') == 'quantile':
if md.get('quantile_value') is not None:
raise ValueError('Quantile value must be specified with '
'quantile statistics')
if md.get('imt') == 'SA':
if md.get('sa_period') is None:
raise ValueError('`sa_period` is required for IMT == `SA`')
if md.get('sa_damping') is None:
raise ValueError('`sa_damping` is required for IMT == `SA`')
def _set_metadata(element, metadata, attr_map, transform=str):
Set metadata attributes on a given ``element``.
:param element:
:class:`lxml.etree._Element` instance
:param metadata:
Dictionary of metadata items containing attribute data for ``element``.
:param attr_map:
Dictionary mapping of metadata key->attribute name.
:param transform:
A function accepting and returning a single value to be applied to each
attribute value. Defaults to `str`.
for kw, attr in attr_map.iteritems():
value = metadata.get(kw)
if value is not None:
element.set(attr, transform(value))
class HazardCurveXMLWriter(object):
Hazard Curve XML writer. See :class:`BaseCurveXMLWriter` for a list of
general constructor inputs.
:param path:
File path (including filename) for XML results to be saved to.
:param metadata:
The following keyword args are required:
* investigation_time: Investigation time (in years) defined in the
calculation which produced these results.
The following are more or less optional (combinational rules noted
below where applicable):
* statistics: 'mean' or 'quantile'
* quantile_value: Only required if statistics = 'quantile'.
* smlt_path: String representing the logic tree path which produced
these curves. Only required for non-statistical curves.
* gsimlt_path: String represeting the GSIM logic tree path which
produced these curves. Only required for non-statisical curves.
The following additional metadata params are required:
* imt: Intensity measure type used to compute these hazard curves.
* imls: Intensity measure levels, which represent the x-axis values of
each curve.
The following parameters are optional:
* sa_period: Only used with imt = 'SA'.
* sa_damping: Only used with imt = 'SA'.
def __init__(self, path, **metadata):
self.path = path
self.metadata = metadata
self.root = None
self.hazard_curves = None # container element
def prepare(self):
self.root = etree.Element(
self.hazard_curves = etree.SubElement(self.root, 'hazardCurves')
_set_metadata(self.hazard_curves, self.metadata, _ATTR_MAP)
imls_elem = etree.SubElement(self.hazard_curves, 'IMLs')
imls_elem.text = ' '.join([str(x) for x in self.metadata['imls']])
def write_node(self, site, poes):
:param site: A string representing the lon and lat. For example:
'45.6 18.2'
:param poes:: A list of probability of exceedence values (floats).
hc_elem = etree.SubElement(self.hazard_curves, 'hazardCurve')
gml_point = etree.SubElement(hc_elem, '{%s}Point' % GML_NAMESPACE)
gml_pos = etree.SubElement(gml_point, '{%s}pos' % GML_NAMESPACE)
gml_pos.text = site
poes_elem = etree.SubElement(hc_elem, 'poEs')
poes_elem.text = ' '.join([str(x) for x in poes])
def close(self):
with open(self.path, 'w') as fh:
self.root, pretty_print=True, xml_declaration=True,
def set_up_arg_parser():
parser = argparse.ArgumentParser(
description='Quantile post-processing tool for SHARE'
gen_grp = parser.add_argument_group('General')
'--quantiles', '-q',
help=('Comma separated list of quantile values. Example: '
'--in-dir', '-i',
help='Directory where input is located. Should contain IMT subfolders.'
'--out-dir', '-o',
help=('Directory where output should be saved. Subfolders will be '
'created for each IMT')
return parser
def make_dirs(target_dir):
print "Making dir: %s" % target_dir
if os.path.exists(target_dir):
if not os.path.isdir(target_dir):
# If it's not a directory, we can't do anything.
# This is a problem
raise RuntimeError('%s already exists and is not a directory.'
% target_dir)
def __validate_imls_imts(readers):
head_reader = readers[0]
tail_readers = readers[1:]
if not all([x.imls == head_reader.imls for x in tail_readers]):
raise RuntimeError('IMLs for hazard curves in dir "%s" do not match'
% inf)
if not all([x.imt == head_reader.imt for x in tail_readers]):
raise RuntimeError('IMTs for hazard curves in dir "%s" do not match'
% inf)
if not all([x.sa_period == head_reader.sa_period for x in tail_readers]):
raise RuntimeError('SA Periods are not uniform in dir "%s"' % inf)
def do_quantiles(in_folders, out_folders, quantiles):
for inf, outf in izip(in_folders, out_folders):
# for each dir, compute quantiles from all the curves we find
# for all quantile levels
input_files = (x for x in os.listdir(inf)
if x.lower().endswith('.xml'))
input_files = [os.path.join(inf, x) for x in input_files]
weights = [x.lower().split('_')[-1].split('.xml')[0]
for x in input_files]
weights = [decimal.Decimal(x) for x in weights]
if not sum(weights) == decimal.Decimal('1.0'):
raise RuntimeError('Weights do not sum to 1.')
readers = [HazardCurveReader(path) for path in input_files]
gens = [ for x in readers]
q_writers = []
metadata = {
'statistics': 'quantile',
'sa_period': readers[0].sa_period,
'sa_damping': readers[0].sa_damping,
'imt': readers[0].imt,
'imls': readers[0].imls,
'investigation_time': readers[0].investigation_time,
for q in quantiles:
metadata['quantile_value'] = q
q_file = os.path.join(outf, 'qq.%s.xml' % q)
writer = HazardCurveXMLWriter(q_file, **metadata)
def curves_gen():
while True:
curve_set = ( for g in gens)
curve_set = list(curve_set)
if not curve_set:
# That's the last of the curves
# We're done
sites, curve_poes = zip(*curve_set)
if not len(set(sites)) == 1:
raise RuntimeError('Sites are not uniform. '
'This input is bad.')
yield [(q, (sites[0],
for q in quantiles]
except StopIteration:
for site_cnt, data_per_location in enumerate(curves_gen()):
print 'site %s processed' % site_cnt
for i, q in enumerate(quantiles):
the_writer = q_writers[i]
assert q == the_writer.metadata['quantile_value']
# close the writers
for the_writer in q_writers:
print 'Making file: %s' % the_writer.path
if __name__ == "__main__":
arg_parser = set_up_arg_parser()
args = arg_parser.parse_args()
if None in (args.quantiles, args.in_dir, args.out_dir):
quantiles = [float(x) for x in args.quantiles.split(',')]
base_dir = args.in_dir
contents = os.listdir(base_dir)
# IMT input subfolders
in_folders = [os.path.join(base_dir, x) for x in contents
if os.path.isdir(os.path.join(base_dir, x))]
out_folders = [os.path.join(args.out_dir, os.path.basename(sf))
for sf in in_folders]
for of in out_folders:
do_quantiles(in_folders, out_folders, quantiles)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment