Created
April 9, 2020 20:56
-
-
Save Hanlin-Dong/ef65ff3e2c9defefe931bab68df9fb99 to your computer and use it in GitHub Desktop.
pyearthquake.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Introduction | |
============ | |
`pyearthquake` is a light-weight module with basic earthquake engineering utilities. | |
The aim of this module is to help engineers play with earthquakes by python programming. | |
*Features* | |
-------- | |
1. Convert .AT2 file downloaded from PEER database to nicely aligned form. | |
2. Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization. | |
3. Beautify the ground motions by truncating, trimming, changing dt, and renaming. | |
4. Store a suite of motions in a single file or separate files. | |
5. Generate displacement spectrums and pesudo acceleration spectrums. | |
6. Inspect a suite or a motion with nice html interactive plots, thanks to [plotly](https://plot.ly). | |
7. Create Code Spectrums. Currently support Chinese Code. | |
Fact Sheet | |
---------- | |
Version: 1.0 | |
Author: [Hanlin Dong](http://www.hanlindong.com) | |
License: MIT | |
Last Update: 2020-04-08 | |
Content Briefing | |
---------------- | |
Currently, it has three main classes: `Spectrum`, `Motion`, and `Suite` | |
`Spectrum` : a class storing seismic spectrum. | |
Some static methods are created confining to Code Spectrums. | |
`Motion` : a class storing acceleration timehistory. | |
Motion has utility functions to nicely parse PEER motion file. | |
Motion also generates spectrums. | |
`Suite` : a class storing a suite of motions and their target spectrum. | |
Suite has utilities to load a PEER file folder. | |
Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes | |
with Monte Carlo simulation and bounded least square optimization. | |
The advantage is that the shape of the accels is not changed unlike wavelet methods, | |
but the average match result is still good. | |
Suite can write all the acceleration files seperately or integrally. | |
Suite can plot all the accelerations and spectrums interactively for users to inspect. | |
Suite can generate .tcl file to be used in OpenSees. | |
All the classes mentioned above can be stored as string file. | |
Currently they are json files with ext: .spectrum, .motion, .suite. | |
These files can be read by static methods in every class. | |
Change log: | |
----------- | |
2020-04-08 13:53:32 v1.0 | |
- Join the three classes to one single file, refactor some functions. | |
- Unify some methods, and create good documentation. | |
- Add plotly interactive plots. | |
''' | |
import os | |
import re | |
import json | |
import random | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
from scipy.signal import lsim | |
from scipy.interpolate import interp1d | |
from scipy.optimize import least_squares | |
class Spectrum(): | |
'''A class storing a spectrum. | |
It can be init-ed simply with (periods, spectrum) sampling points. | |
It can also be init-ed by static functions to generate Code-confined spectrums. | |
Parameters | |
---------- | |
`periods` : List, numpy.ndarray | |
List of sampling periods. | |
It can be generated by the static function `Spectrum.periods_standard` . | |
Otherwise, users can specify desired period sampling points. | |
`spectrum` : List, numpy.ndarray | |
List of the spectrum points corresponding to sampling periods. | |
''' | |
def __init__(self, periods, spectrum): | |
self.periods = np.array(periods) | |
'''numpy.ndarray, sampling periods''' | |
self.spectrum = np.array(spectrum) | |
'''numpy.ndarray, spectrum corresponding to sampling periods.''' | |
def match(self, target, start, end, step=0.1): | |
'''Match the current spectrum with a target spectrum. Use Least Square method. | |
Parameters | |
---------- | |
`target` : `Spectrum` | |
the spectrum to be matched. | |
`start`, `end` : float | |
the matching period boundaries. | |
`step` : float | |
evenly sample the periods with the step. | |
Returns | |
------- | |
`coefficient` : float | |
the coefficient to match the current spectrum to the target. | |
`srss` : float | |
the srss after match. | |
''' | |
periods = np.arange(start, end+step, step) | |
this_spectrum = np.array([self.get(p) for p in periods]) | |
that_spectrum = np.array([target.get(p) for p in periods]) | |
coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2) | |
srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5 | |
return coefficient, srss | |
def get(self, period): | |
'''Get the spectrum value. Either a list or a number is accepted. | |
If the period exceeds the largest sampling period, the last spectrum value is returned. | |
The spectrum value not on sampling points are linearly interpolated. | |
Parameters | |
---------- | |
`period` : number, list, numpy.ndarray | |
The interested period. | |
Returns | |
------- | |
`spectrum` : number, numpy.ndarray | |
if period is list-like, return a ndarray. | |
if period is a number, return a number. | |
''' | |
if not hasattr(period, '__iter__'): | |
i = self.periods.searchsorted(period) | |
if self.periods[i] == period: | |
return self.spectrum[i] | |
elif i >= len(self.periods): | |
print("Warning: given period exceeds the sampling range.") | |
return self.spectrum[-1] | |
else: | |
return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period) | |
else: | |
return np.array([self.get(p) for p in period]) | |
def scale(self, factor_x=1, factor_y=1): | |
'''Scale the spectrum. | |
Parameters | |
---------- | |
`factor_x` : number | |
scaling factor in x direction (periods) | |
`factor_y` : number | |
scaling factor in y direction (spectrum) | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
''' | |
return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y) | |
def plot(self, filename=None, engine='pyplot'): | |
'''Plot the spectrum. | |
Parameters | |
---------- | |
`filename` : string, None | |
if None, show it. Else, save it to the filename | |
`engine` : 'pyplot' or 'plotly' | |
Plot engine. | |
Outputs | |
------- | |
A file with name `filename`.png or `filename`.html will be generated. | |
''' | |
if engine == 'pyplot': | |
plt.figure(figsize=(4, 3)) | |
plt.plot(self.periods, self.spectrum, marker='o') | |
plt.xlabel('Period (s)') | |
plt.ylabel('Spetrum') | |
plt.xlim([0, self.periods[-1]]) | |
if filename is None: | |
plt.show() | |
else: | |
plt.savefig(filename+'.png', dpi=200) | |
elif engine == 'plotly': | |
fig = go.Figure() | |
fig.add_scatter(dict( | |
x='self.periods', | |
y='self.spectrum', | |
mode='lines+markers', | |
)) | |
if filename is None: | |
fig.show() | |
else: | |
fig.write_html(filename+'.html') | |
else: | |
print('ERROR: engine should be in "pyplot" or "plotly"') | |
def to_dict(self): | |
'''Convert the instance properties to a dict. | |
Returns | |
------- | |
`it` : dict | |
''' | |
return dict( | |
periods=self.periods.tolist(), | |
spectrum=self.spectrum.tolist() | |
) | |
def to_json(self): | |
'''Convert the properties to json. | |
Returns | |
------- | |
`json` : string | |
''' | |
return json.dumps(self.to_dict()) | |
def save(self, filename): | |
'''Save the spectrum to a file. Currently use json form. | |
Parameters | |
---------- | |
`filename` : string | |
The file name to be saved. | |
The extension '.spectrum' will be automatically added. | |
Outputs | |
------- | |
A file with name `filename`.spectrum will be generated. | |
''' | |
with open(filename + '.spectrum', 'w') as f: | |
f.write(self.to_json()) | |
def write_csv(self, filename): | |
'''Write the spectrum to a csv file. | |
Parameters | |
---------- | |
`filename` : string | |
The extension '.csv' will be automatically added. | |
Outputs | |
------- | |
A file named `filename`.csv will be generated. | |
''' | |
np.savetxt(filename+'.csv', np.vstack( | |
[self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum') | |
@staticmethod | |
def read_csv(filename): | |
'''Create an instance by reading the written csv file. | |
Parameters | |
---------- | |
`filename` : string | |
The filename to read. | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
New instance. | |
''' | |
if not filename.endswith('.csv'): | |
filename += '.csv' | |
data = np.loadtxt(filename, delimiter=',') | |
return Spectrum(periods=data[:, 0], spectrum=data[:, 1]) | |
@staticmethod | |
def read_dict(it): | |
'''Create an instance from the dict. | |
Parameters | |
---------- | |
`it` : dict | |
The dict generated by `Spectrum.to_dict` | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
New instance. | |
''' | |
return Spectrum(periods=it['periods'], spectrum=it['spectrum']) | |
@staticmethod | |
def read_json(string): | |
'''Create an instance from json string. | |
Parameters | |
---------- | |
`string` : str | |
The json string | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
New instance. | |
''' | |
it = json.loads(string) | |
return Spectrum.read_dict(it) | |
@staticmethod | |
def load(filename): | |
'''Load the spectrum from file. | |
Parameters | |
---------- | |
`filename` : str | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
New instance. | |
''' | |
if not filename.endswith('.spectrum'): | |
filename += '.spectrum' | |
with open(filename, 'r') as f: | |
string = f.read() | |
return Spectrum.read_json(string) | |
@staticmethod | |
def periods_standard(until=6): | |
'''Get a standard sampling of the periods. | |
The sampling points are more sparse with the increase of period. | |
Parameters | |
---------- | |
`until` : number | |
The end of the period sampling list. It should >= 4.0. | |
Returns | |
------- | |
`periods` : numpy.ndarray | |
The standard sampling points. | |
''' | |
assert until >= 4 | |
return np.hstack([np.arange(0, 0.2, 0.01), | |
np.arange(0.2, 0.5, 0.02), | |
np.arange(0.5, 1.0, 0.05), | |
np.arange(1.0, 2.0, 0.1), | |
np.arange(2.0, 4.0, 0.2), | |
np.arange(4.0, until+0.5, 0.5)]) | |
@staticmethod | |
def chinese_point(period, intensity, major, site, group, damping=0.05): | |
'''Get spectrum point for Chinese code, given a period. | |
Parameters | |
---------- | |
`period` : number | |
The given period. | |
`intensity` : number | |
Intensity represented by a number. | |
Use 7.5 or 8.5 for half-level intensity. | |
`major` : boolean | |
If True, it's for major (rare) earthquake. | |
If False, for minor (frequent) earthquake. | |
`site` : int | |
Site number (Roman number in the code. 0 for I0, 1 for I1.) | |
`group` : int | |
Site class (Chinese number in the code.) | |
`damping` : float | |
Spectrum damping. | |
Returns | |
------- | |
`value` : float | |
the specific spectrum point value corresponding to the period. | |
''' | |
# alpha max | |
if intensity == 6: | |
alphamax = 0.28 if major else 0.04 | |
elif intensity == 7: | |
alphamax = 0.50 if major else 0.08 | |
elif intensity == 7.5: | |
alphamax = 0.72 if major else 0.12 | |
elif intensity == 8: | |
alphamax = 0.9 if major else 0.16 | |
elif intensity == 8.5: | |
alphamax = 1.2 if major else 0.24 | |
elif intensity == 9: | |
alphamax = 1.40 if major else 0.32 | |
else: | |
raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]") | |
# tg | |
tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65], | |
[0.25, 0.3, 0.4, 0.55, 0.75], | |
[0.3, 0.35, 0.45, 0.65, 0.9]] | |
assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4] | |
tg = tg_table[group-1][site] | |
# other parameters | |
gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping) | |
eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping) | |
eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping) | |
# calculate | |
if period < 0.1: | |
return ((10*eta2 - 4.5) * period + 0.45) * alphamax | |
elif period < tg: | |
return eta2 * alphamax | |
elif period < 5*tg: | |
return (tg/period)**gamma * eta2 * alphamax | |
elif period <= 6: | |
return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax | |
else: | |
return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax | |
@staticmethod | |
def chinese(intensity=8, major=True, site=3, group=2, damping=0.05): | |
'''Return a new spectrum instance that confines to Chinese Code. | |
Use `Spectrum.periods_standard` to get sampling periods. | |
Parameters | |
---------- | |
refer to `Spectrum.chinese_point` | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
''' | |
periods = Spectrum.periods_standard(until=6) | |
spectrum = [Spectrum.chinese_point(p, | |
intensity=intensity, | |
major=major, | |
site=site, | |
group=group, | |
damping=damping) for p in periods] | |
return Spectrum(periods, spectrum) | |
class Motion(object): | |
'''A class plays with ground motion. Store accel time history and accel spectrum only. | |
''' | |
def __init__(self, folder, name, damping=0.05): | |
''' | |
Parameters | |
---------- | |
`folder` : str | |
the folder this motion is stored. | |
`name` : str | |
The motion name. The motion will be saved to '{folder}/{name}.motion' | |
`damping` : float | |
The damping ratio of the motion spectrum. | |
''' | |
self.folder = folder | |
'''str. See `Motion`.''' | |
os.makedirs(folder, exist_ok=True) | |
self.name = name | |
'''str. See `Motion`.''' | |
self.accel = None | |
'''numpy.ndarray. Acceleration time history with evenly distributed sampling points.''' | |
self.dt = None | |
'''float. Acceleration time sampling delta t.''' | |
self.information = '' | |
'''str. Store massive infomation about the motion. ''' | |
self.spectrum = None | |
'''`Spectrum`. The spectrum of the motion.''' | |
self.damping = damping | |
'''float. The damping ratio of the spectrum. Note: init with 0.05.''' | |
def get_PGA(self): | |
''' | |
Returns | |
------- | |
`value` : float | |
The peak ground acceleration. | |
''' | |
return np.max(abs(self.accel)) | |
def get_duration(self): | |
''' | |
Returns | |
------- | |
`value` : float | |
The duration of the motion. | |
''' | |
return (len(self.accel) - 1) * self.dt | |
def get_times(self): | |
''' | |
Returns | |
------- | |
`value` : numpy.ndarray | |
The time series of the motion | |
''' | |
return np.arange(len(self.accel)) * self.dt | |
def generate_spectrum(self, periods=None): | |
'''Use scipy LTS signal processing module to generate the spectrum. | |
Note that the accel spectrum is pseudo-spectrum derived from disp spectrum. | |
So it's only accurate when damping is small. | |
If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated, | |
a blank spectrum will be returned. | |
Parameters | |
---------- | |
`periods` : List-like, None | |
The period sampling points. | |
If is None, `Spectrum.periods_standard` will be called. | |
Returns | |
------- | |
`spectrum` : `Spectrum` | |
The generated spectrum. | |
Outputs | |
------- | |
`Motion.spectrum` will be changed. | |
''' | |
if periods is None: | |
periods = Spectrum.periods_standard() | |
if self.dt is not None: | |
print("Generating spectrum ...") | |
omegas = 2 * np.pi / periods[1:] | |
num = np.array([-1]) | |
spectrum = np.zeros(len(periods)) | |
spectrum[0] = self.get_PGA() | |
for i, omega in enumerate(omegas): | |
den = np.array([1, 2*self.damping*omega, omega**2]) | |
_, y, _ = lsim((num, den), self.accel, self.get_times()) | |
spectrum[i+1] = np.max(abs(y)) * omega ** 2 | |
self.spectrum = Spectrum(periods, spectrum) | |
else: | |
print("Dt is not known. Assigning empty spectrum.") | |
self.spectrum = Spectrum(periods=[], spectrum=[]) | |
return self.spectrum | |
def load_peer(self, peer_filename): | |
'''Load acceleration time-history files downloaded from PEER. | |
The strings in the file will be parsed and also stored in self.information. | |
Parameters | |
---------- | |
`peer_filename` : str | |
Outputs | |
------- | |
`Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed. | |
''' | |
self.information += f'peer_filename: {peer_filename}' | |
try: | |
f = open(peer_filename, 'r') | |
for _ in range(3): | |
self.information += f.readline() | |
s = f.readline() | |
self.information += s | |
re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC") | |
mt = re_dt.match(s) | |
try: | |
self.dt = float(mt.group(1).replace('.', '0.')) | |
print(f'Extracted dt = {self.dt:.4f}') | |
except: | |
print(f'ERROR: cannot find dt from {s}') | |
self.dt = 0.000001 | |
series = [] | |
datastring = f.readlines() | |
for dtline in datastring: | |
for dt in dtline.split(" "): | |
if dt: | |
try: | |
series.append(float(dt.replace(".", "0."))) | |
except ValueError: | |
break | |
self.accel = np.array(series) | |
print(f'Extracted npts = {len(self.accel)}') | |
except IOError: | |
print(f'No file named {self.name} is found.') | |
def convert_peer(self, peer_filename): | |
'''A recipe to convert the peer acceleration file to the .motion file. | |
Parameters | |
---------- | |
`peer_filename` : str | |
`folder` : str | |
the folder to save .motion file. | |
Outputs | |
------- | |
A .motion file will be created in `folder` with the PEER filename. | |
''' | |
print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}') | |
self.load_peer(peer_filename) | |
self.generate_spectrum() | |
self.save() | |
def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None): | |
'''Trim the motion with two coefficients of PGA. | |
This method is used when a long near-zero time-history is stored in the data file. | |
Parameters | |
---------- | |
`new_name` : str | |
`new_folder` : str, None | |
`left`, `right` : float | |
The coefficient of PGA to trim the time history. | |
Search from left or right, when the target point is found, | |
search for the nearest near-zero point. Then trim off the small values. | |
`prepend_zero` : boolean | |
If True, prepend a zero value to the trimmed acceleration. | |
`log` : boolean | |
If the log picture will be generated. | |
`log_folder` : str, None | |
Path for the folder to save the trim logs. | |
If None, save in the previous folder. | |
Returns | |
------- | |
`motion` : `Motion` | |
The trimmed motion. | |
Outputs | |
------- | |
If log, plot a figure in log_folder named '{new_name}_trimlog.png' | |
''' | |
mo = self.copy() | |
mo.name = new_name | |
mo.folder = new_folder if new_folder is not None else self.folder | |
pga = self.get_PGA() | |
# search left | |
i = 0 | |
while abs(self.accel[i]) < pga * left: | |
i += 1 | |
while i > 0 and self.accel[i] * self.accel[i-1] > 0: | |
i -= 1 | |
i_left = i | |
# search right | |
i = -1 | |
while abs(self.accel[i]) < pga * right: | |
i -= 1 | |
while i < -1 and self.accel[i] * self.accel[i+1] > 0: | |
i += 1 | |
i_right = len(self.accel) + i | |
if prepend_zero: | |
mo.accel = np.hstack([0, self.accel[i_left:i_right]]) | |
else: | |
mo.accel = self.accel[i_left:i_right] | |
mo.generate_spectrum() | |
print( | |
f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') | |
if log: | |
log_folder = self.folder if log_folder is None else log_folder | |
os.makedirs(log_folder, exist_ok=True) | |
_, ax = plt.subplots(3, 1, figsize=(7, 5)) | |
ax[0].plot(self.get_times(), self.accel, label='before') | |
ax[1].plot(mo.get_times(), mo.accel, label='after') | |
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') | |
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') | |
for i in range(3): | |
ax[i].legend() | |
plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') | |
plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200) | |
return mo | |
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None): | |
'''Change dt for the accel. Linearly interpolate the acceleration, and re-sample. | |
Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum. | |
Parameters | |
---------- | |
`dt` : float | |
New sampling delta t. | |
`new_name` : str | |
`new_folder` : str, None | |
`log` : boolean | |
If log, create a log picture comparing the before and after accels and spectrum. | |
`log_folder` : str, None | |
Path for the folder to save the trim logs. | |
If None, save in the previous folder. | |
Returns | |
------- | |
`motion` : `Motion` | |
Outputs | |
------- | |
If log, create a picture named '{log_folder}/{new_name}_dtlog.png'. | |
''' | |
if dt == self.dt: | |
print(f'Motion {self.name} do not need to change dt') | |
return self.copy() | |
else: | |
mo = self.copy() | |
mo.name = new_name | |
mo.folder = new_folder if new_folder is not None else self.folder | |
f = interp1d(self.get_times(), self.accel) | |
npts = np.floor(self.dt * len(self.accel) / dt) | |
mo.accel = f(np.arange(npts)*dt) | |
mo.dt = dt | |
mo.generate_spectrum() | |
print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') | |
if log: | |
log_folder = self.folder if log_folder is None else log_folder | |
os.makedirs(log_folder, exist_ok=True) | |
_, ax = plt.subplots(3, 1, figsize=(7, 5)) | |
ax[0].plot(self.get_times(), self.accel, label='before') | |
ax[1].plot(mo.get_times(), mo.accel, label='after') | |
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') | |
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') | |
for i in range(3): | |
ax[i].legend() | |
plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.') | |
plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200) | |
return mo | |
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None): | |
'''Truncate the acceleration. | |
Sometimes, a ground motion file may have two peaks or more. | |
So if only one peak is needed, this method can be used to skip the other peaks. | |
Parameters | |
---------- | |
`time` : float | |
The accels after the time will be cut off. | |
`new_name` : str | |
`new_folder` : str, None | |
`log` : boolean | |
`log_folder` : str, None | |
Returns | |
------- | |
`motion` : `Motion` | |
Outputs | |
------- | |
If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created. | |
''' | |
i = round(time / self.dt) | |
mo = self.copy() | |
mo.name = new_name | |
mo.folder = new_folder if new_folder is not None else self.folder | |
if i >= len(self.accel): | |
print(f"WARNING: time is longer than duration. {self.name} is not truncated.") | |
return mo | |
mo.accel = self.accel[:i] | |
mo.generate_spectrum() | |
print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') | |
if log: | |
log_folder = self.folder if log_folder is None else log_folder | |
os.makedirs(log_folder, exist_ok=True) | |
_, ax = plt.subplots(3, 1, figsize=(7, 5)) | |
ax[0].plot(self.get_times(), self.accel, label='before') | |
ax[1].plot(mo.get_times(), mo.accel, label='after') | |
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before') | |
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after') | |
for i in range(3): | |
ax[i].legend() | |
plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.') | |
plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200) | |
return mo | |
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1): | |
'''Scale the ground motion. | |
Parameters | |
---------- | |
`new_name` : str | |
`new_folder` : str, None | |
`factor_x` : float | |
The scaling factor on the time/period axis. | |
`factor_y` : float | |
The scaling factor on the accel axis. | |
Returns | |
------- | |
`motion` : Motion | |
''' | |
mo = self.copy() | |
mo.name = new_name | |
mo.folder = new_folder if new_folder is not None else self.folder | |
if factor_x != 1: | |
mo.dt = self.dt * factor_x | |
if factor_y != 1: | |
mo.accel = self.accel * factor_y | |
mo.spectrum = self.spectrum.scale(factor_x, factor_y) | |
return mo | |
def plot_timehistory(self, folder=None, save=True, engine='pyplot'): | |
'''Plot the time history of the acceleration. | |
Parameters | |
---------- | |
`folder` : str, None | |
Store folder. If None, use `Motion.folder` | |
`save` : boolean | |
Save the figure or not. | |
`engine` : 'pyplot' or 'plotly' | |
Outputs | |
------- | |
if save is True, create a figure '{folder}/{self.name}_th'. | |
''' | |
folder = self.folder if folder is None else folder | |
if engine == 'pyplot': | |
plt.figure() | |
t = self.get_times() | |
plt.plot(t, self.accel) | |
if save: | |
plt.savefig(f'{folder}/{self.name}_th.png', dpi=200) | |
else: | |
plt.show() | |
elif engine == 'plotly': | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=self.get_times(), | |
y=self.accel, | |
mode='lines', | |
name=self.name | |
)) | |
if save: | |
fig.write_html(f'{folder}/{self.name}_th.html') | |
else: | |
fig.show() | |
else: | |
print('ERROR: engine should be in "pyplot" or "plotly"') | |
def plot(self, folder=None, save=True, engine='pyplot'): | |
'''Plot the time history and the spectrum. | |
Parameters | |
---------- | |
See `Motion.plot_timehistory` | |
Outputs | |
------- | |
Create a figure named '{folder}/{name}' | |
''' | |
folder = self.folder if folder is None else folder | |
if engine == 'pyplot': | |
_, ax = plt.subplots(2, 1, figsize=(7, 8)) | |
ax[0].plot(self.get_times(), self.accel) | |
ax[0].set_xlabel('Time(s)') | |
ax[0].set_ylabel('Acceleration(g)') | |
ax[0].set_title(f'PGA={self.get_PGA():.4f}') | |
ax[1].plot(self.spectrum.periods, self.spectrum.spectrum) | |
ax[1].set_xlabel('Period(s)') | |
ax[1].set_ylabel('Spectrum Acceleration(g)') | |
ax[1].set_title(f'damping={self.damping:.4f}') | |
plt.subplots_adjust(hspace=0.3) | |
if save: | |
plt.savefig(f'{folder}/{self.name}.png', dpi=200) | |
else: | |
plt.show() | |
elif engine == 'plotly': | |
fig = make_subplots(rows=1, cols=2) | |
fig.add_trace(go.Scatter( | |
x=self.get_times(), | |
y=self.accel, | |
mode='lines', | |
name='accel' | |
), row=1, col=2) | |
fig.add_trace(go.Scatter( | |
x=self.spectrum.periods, | |
y=self.spectrum.spectrum, | |
mode='lines', | |
name='spectrum' | |
), row=1, col=1) | |
if save: | |
fig.write_html(f'{folder}/{self.name}.html') | |
else: | |
fig.show() | |
else: | |
print('Engine should be in "pyplot" or "plotly"') | |
def write_accel(self, filename, time=True): | |
'''Write the acceleration timehistory to a single file. | |
Parameters | |
---------- | |
`filename` : str | |
The file path to store the accelration. | |
`time` : boolean | |
If True, a time column is created. | |
To know dt in the file content, time should be True. | |
Otherwise, dt should be stored in the filename. | |
Outputs | |
------- | |
An text file with acceleration information will be written. | |
''' | |
if time: | |
data = np.vstack([self.get_times(), self.accel]) | |
np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel") | |
else: | |
np.savetxt(filename, self.accel, fmt="%.7e") | |
def write_info(self, filename): | |
'''Write the information to a single file. | |
Parameters | |
---------- | |
`filename` : str | |
Outputs | |
------- | |
A text file with information will be created. | |
''' | |
with open(filename, 'w') as f: | |
f.write(f'dt={self.dt:.3}\n') | |
f.write(f'npts={len(self.accel)}\n') | |
f.write(f'damping={self.damping:.3}\n') | |
f.write(self.information) | |
def write_spectrum(self, filename): | |
'''Write the spectrum to a single file. Use the form of csv. | |
Parameters | |
---------- | |
`filename` : str | |
Outputs | |
------- | |
A csv file will be created. | |
''' | |
self.spectrum.write_csv(filename) | |
def read_accel(self, filename, dt=True): | |
'''Load an single-column acceleration file to the current instance. | |
Parameters | |
---------- | |
`dt` : float, boolean | |
if dt is float, `Motion.dt` will be set to dt. | |
Else if dt is True, `Motion.dt` will be found in the file. | |
Outputs | |
------- | |
Change `Motion.dt` and `Motion.accel` in the CURRENT instance. | |
''' | |
if not isinstance(dt, float): | |
data = np.loadtxt(filename) | |
self.accel = data[:, 1] | |
self.dt = data[1, 0] | |
else: | |
self.accel = np.loadtxt(filename) | |
self.dt = dt | |
def read_spectrum(self, filename): | |
'''Read the spectrum from csv file. | |
Parameters | |
---------- | |
`filename` : str | |
Outputs | |
------- | |
The `Motion.spectrum` of the CURRENT instance will be changed. | |
''' | |
spectrum = Spectrum.read_csv(filename) | |
self.spectrum = spectrum | |
def read_info(self, filename): | |
'''Read info from text file. | |
Parameters | |
---------- | |
`filename` : str | |
Outputs | |
------- | |
The `Motion.information` of the CURRENT instance will be changed. | |
''' | |
with open(filename, 'r') as f: | |
self.dt = float(f.readline().replace("dt=", "")) | |
_ = int(f.readline().replace("npts=", "")) | |
self.damping = float(f.readline().replace("damping=", "")) | |
self.information = f.read() | |
def to_dict(self): | |
'''Convert the instance properties to dict. | |
Returns | |
------- | |
`it` : dict | |
''' | |
return dict( | |
dt=self.dt, | |
npts=len(self.accel), | |
damping=self.damping, | |
information=self.information, | |
accel=self.accel.tolist(), | |
spectrum=self.spectrum.to_dict(), | |
folder=self.folder, | |
name=self.name, | |
) | |
def to_json(self): | |
'''Convert the instance properties to string using json format. | |
Returns | |
------- | |
`string` : str | |
''' | |
return json.dumps(self.to_dict()) | |
def save(self): | |
'''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name` | |
Outputs | |
------- | |
A file named '{self.folder}/{self.name}.motion' will be created. | |
''' | |
with open(f'{self.folder}/{self.name}.motion', 'w') as f: | |
f.write(self.to_json()) | |
@staticmethod | |
def read_dict(it): | |
'''Create a instance by the dict generated by `Motion.to_dict`. | |
Parameters | |
---------- | |
`it` : dict | |
Returns | |
------- | |
`mo` : `Motion` | |
''' | |
mo = Motion(folder=it['folder'], name=it['name']) | |
mo.dt = it['dt'] | |
mo.damping = it['damping'] | |
mo.information = it['information'] | |
mo.accel = np.array(it['accel']) | |
mo.spectrum = Spectrum.read_dict(it['spectrum']) | |
return mo | |
@staticmethod | |
def read_json(string): | |
'''Create a new instance by the string generated by `Motion.to_json`. | |
Parameters | |
---------- | |
`string` : str | |
Returns | |
------- | |
`mo` : `Motion` | |
''' | |
it = json.loads(string) | |
return Motion.read_dict(it) | |
@staticmethod | |
def load(filename): | |
'''Load the file created from `Motion.save` method. | |
Parameters | |
---------- | |
`filename` : str | |
Returns | |
------- | |
`mo` : `Motion` | |
''' | |
if not filename.endswith('.motion'): | |
filename += '.motion' | |
with open(filename, 'r') as f: | |
return Motion.read_json(f.read()) | |
def copy(self): | |
'''Copy the instance. | |
Returns | |
------- | |
`mo` : `Motion` | |
''' | |
return Motion.read_dict(self.to_dict()) | |
class Suite(object): | |
'''A class that contains mainly a series of `Motion` instances, | |
and corresponding series of factors, matching the suite of motions to the target `Spectrum`. | |
''' | |
def __init__(self, folder, name, target_spectrum, period_bound): | |
''' | |
Parameters | |
---------- | |
`folder` : str | |
A path-like string for the suite. | |
`name` : str | |
A specific name for the suite. The instance will be saved in '{folder}/{name}.suite' | |
`target_spectrum` : `Spectrum` | |
The target spectrum of the suite. | |
`period_bound` : List<float> len=2 | |
The matching period boundaries. | |
''' | |
self.folder = folder | |
'''str. See `Suite`''' | |
os.makedirs(folder, exist_ok=True) | |
self.name = name | |
'''str. See `Suite`''' | |
self.motions = [] | |
'''List<`Motion`>. Store all the motions.''' | |
self.factors = [] | |
'''List<float>. Store all the factors that should be applied to the motions.''' | |
self.target_spectrum = target_spectrum | |
'''`Spectrum`. See `Suite`''' | |
self.period_bound = period_bound | |
'''List<float>. See `Suite`''' | |
self.period_samples = np.linspace(period_bound[0], period_bound[1], 200) | |
'''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.''' | |
self.spectrum_matrix = None | |
'''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)). | |
This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.''' | |
self.target_vector = target_spectrum.get(self.period_samples) | |
'''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`''' | |
def set_folder(self, folder): | |
'''Reset the folder for the suite and the motions. | |
Parameters | |
---------- | |
`folder` : str | |
Outputs | |
------- | |
`Suite.folder` and `Suite.motions` will be changed. | |
''' | |
if folder is not None: | |
self.folder = folder | |
os.makedirs(folder, exist_ok=True) | |
for mo in self.motions: | |
mo.folder = folder | |
def add_motion(self, motion, factor=1): | |
'''Add motion to the suite together with a factor, to keep the lengths are identical. | |
Parameters | |
---------- | |
`motion` : `Motion` | |
`factor` : number | |
Outputs | |
------- | |
`Suite.motions` and `Suite.factors` will be changed. | |
''' | |
self.motions.append(motion) | |
self.factors.append(factor) | |
def load_motions_from_names(self, names, folder=None): | |
'''Give a list of name and the folder, load motions. | |
Parameters | |
---------- | |
`names` : List<str> | |
The list of motion names. | |
`folder` : str, None | |
The folder of the motions. | |
If is None, use `Suite.folder` | |
Outputs | |
------- | |
`Suite.motions` and `Suite.factors` will be changed. | |
''' | |
folder = self.folder if folder is None else folder | |
for name in names: | |
mo = Motion.load(f'{folder}/{name}.motion') | |
self.add_motion(mo) | |
print(f'Loaded motion {mo.name}') | |
self.reset_spectrum_matrix() | |
def load_motions_from_folder(self, folder=None): | |
'''Find all the .motion file from the given folder, and load them. | |
Parameters | |
---------- | |
`folder` : str, None | |
The folder to search the .motion files. | |
If is None, use `Suite.folder` instead. | |
Outputs | |
------- | |
`Suite.motions` and `Suite.factors` will be changed. | |
''' | |
folder = self.folder if folder is None else folder | |
filenames = os.listdir(self.folder) | |
for filename in filenames: | |
if filename.endswith('.motion'): | |
mo = Motion.load(f'{folder}/{filename}') | |
self.add_motion(mo) | |
print(f'Loadad motion {mo.name}') | |
self.reset_spectrum_matrix() | |
def filter_by_IDs(self, ids, new_name, new_folder=None): | |
'''Give a series of selected IDs, filter the motions. | |
Parameters | |
---------- | |
`ids` : List<int> | |
The list of indexes of the selected motions in the current suite. | |
`new_name` : str | |
`new_folder` : str, None | |
Returns | |
------- | |
`suite` : `Suite` | |
A new suite with the selected motions and their factors only. | |
''' | |
suite = self.copy() | |
suite.name = new_name | |
suite.set_folder(new_folder) | |
suite.motions = [self.motions[i] for i in ids] | |
suite.factors = [self.factors[i] for i in ids] | |
suite.reset_spectrum_matrix() | |
print(f'The motions have been filtered. {len(ids)} motions remain.') | |
return suite | |
def filter_by_file(self, filename, new_name, new_folder=None): | |
'''Give a file with indexes selected. | |
Parameters | |
---------- | |
Refer to `Suite.filter_by_IDs`. | |
Returns | |
------- | |
Refer to `Suite.filter_by_IDs`. | |
''' | |
with open(filename, 'r') as f: | |
lines = f.readlines() | |
ids = [int(l) for l in lines] | |
return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder) | |
def get_average_spectrum(self): | |
'''Return a spectrum which is the average spectrum of the suite. | |
Outputs | |
------- | |
`spectrum` : `Spectrum` | |
''' | |
periods = self.motions[0].spectrum.periods | |
average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0] | |
for i in range(1, len(self.motions)): | |
average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i] | |
average_spectrum /= len(self.motions) | |
return Spectrum(periods, average_spectrum) | |
def get_srss(self): | |
'''Get the srss of the current averate spectrum to the target spectrum in the period boundary. | |
Returns | |
------- | |
`srss` : float | |
''' | |
return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item() | |
def reset_spectrum_matrix(self): | |
'''Reset the spectrum matrix, which produces the average spectrum | |
in the boundary by multiplying factors vector. | |
This method should be called everytime `Suite.motions` change. | |
Outputs | |
------- | |
`Suite.spectrum_matrix` and `Suite.target_vector` will be changed. | |
''' | |
mat = np.zeros((len(self.period_samples), len(self.motions))) | |
for i in range(len(self.motions)): | |
spectrum = self.motions[i].spectrum.get(self.period_samples) | |
mat[:, i] = spectrum / len(self.motions) | |
self.spectrum_matrix = mat | |
self.target_vector = self.target_spectrum.get(self.period_samples) | |
def eliminate_neg(self): | |
'''Eliminate negative target vector by multiplying a factor for every individual. | |
In other words, make the average spectrum above the target spectrum. | |
Returns | |
------- | |
`factor` : float | |
The global factor that should be applied to each of the factors. | |
Outputs | |
------- | |
`Suite.factors` are all amplified by the returned `factor`. | |
''' | |
error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector | |
argmin = np.argmin(error) | |
factor = 1 / (1 + error[argmin] / self.target_vector[argmin]) | |
print(f'Multiply {factor:.4} to eliminate negative spectrum.') | |
self.factors = [self.factors[i] * factor for i in range(len(self.factors))] | |
return factor | |
def match_individual_LSQ(self): | |
'''Use Least square method for each individual motion to match the spectrum. | |
Returns | |
------- | |
`residuals` : numpy.ndarray | |
srss of each motion matched to the target spectrum. | |
Outputs | |
------- | |
`Suite.factors` are changed. | |
''' | |
residuals = np.zeros(len(self.motions)) | |
for i in range(len(self.motions)): | |
this = self.spectrum_matrix[:, i] * len(self.motions) | |
that = self.target_vector | |
factor = np.dot(this, that) / np.dot(this, this) | |
residual = np.linalg.norm(this * factor - that) | |
self.factors[i] = factor | |
residuals[i] = residual | |
print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}") | |
return residuals | |
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000): | |
'''Use Monte Carlo simulation and Least Square method to optimize the suite. | |
after running, a bunch of suites will be written to the new_folder. | |
Parameters | |
---------- | |
`count` : int | |
The number of motions in the resulting suite. | |
`times` : int | |
The number of times that MC simulation with dimension is run. | |
The total number of samples of MC simulation will be {times} * {dimension}. | |
`use_lsq` : boolean | |
Whether use LSQ optimizer to better optimize the suite. | |
`output_count` : int | |
The number of outputs. if 0, output all. | |
`new_folder` : str | |
`lower_bound`, `upper_bound` : float | |
For the purpose that the individual motions are not too far from the individually matched factors, | |
the lower and upper boundaries of amplifying the individually matched factors are set. | |
This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer. | |
`dimension` : int | |
Each time the Monte Carlo simulation is run, | |
means that {dimension} times of simulation have been done by using a matrix target. | |
Returns | |
------- | |
List<`Suite`> with len=`output_count`. | |
The list is sorted. The best answer is the first result. However, visual check should be done. | |
Outputs | |
------- | |
Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted. | |
''' | |
new_folder = new_folder if new_folder is not None else self.folder | |
os.makedirs(new_folder, exist_ok=True) | |
suites = [] | |
srsses = [] | |
for i in range(times): | |
suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder) | |
suites.append(suite_new) | |
srsses.append(suite_new.get_srss()) | |
if use_lsq: | |
suite_new2 = suite_new.optimize_LSQ( | |
new_name=f'lsq{i:03d}', | |
new_folder=new_folder, | |
lower_bound=lower_bound, | |
upper_bound=upper_bound | |
) | |
suites.append(suite_new2) | |
srsses.append(suite_new2.get_srss()) | |
argsorted = np.argsort(np.array(srsses)) | |
results = [suites[i] for i in argsorted[:output_count]] | |
for i, suite in enumerate(results): | |
suite.name = f'no{i}-{suite.name}' | |
suite.save() | |
suite.plot_all_spectrums(engine='pyplot') | |
return results | |
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000): | |
'''Use Monte Carlo simulation to filter the suite to count numbers of motions. | |
Parameters | |
---------- | |
Refer to `Suite.filter_optimize`. | |
Returns | |
------- | |
`suite` : `Suite` | |
The optimized result after {dimension} times of MC simulations. | |
''' | |
print(f'Running Monte Carlo simulation, dimension={dimension}') | |
self.match_individual_LSQ() | |
factors = np.array(self.factors) | |
xs = np.zeros((len(self.motions), dimension)) | |
for i in range(dimension): | |
indexes = random.sample(range(len(self.motions)), count) | |
xs[indexes, i] = factors[indexes] | |
mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs) | |
loss = mul - self.target_vector.reshape(-1, 1) | |
argmin = np.argmin(loss, axis=0) | |
lossmin = np.min(loss, axis=0) | |
target_at_lossmin = self.target_vector[argmin] | |
amps = 1 / (lossmin / target_at_lossmin + 1) | |
lossAmp = mul * amps - self.target_vector.reshape(-1, 1) | |
srss = np.linalg.norm(lossAmp, axis=0) | |
arg = np.argmin(srss) | |
factors = xs[:, arg] * amps[arg] | |
suvives = np.where(factors > 1.0e-3) | |
suite = self.copy() | |
suite.name = new_name | |
suite.set_folder(new_folder) | |
suite.motions = [self.motions[i] for i in suvives[0]] | |
suite.factors = [factors[i] for i in suvives[0]] | |
suite.reset_spectrum_matrix() | |
return suite | |
def _loss(self, x): | |
'''Utility function: calculate the loss for optimization.''' | |
factors = x * np.array(self.factors) | |
mul = np.matmul(self.spectrum_matrix, factors) | |
loss = mul - self.target_vector | |
argmin = np.argmin(loss) | |
amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1) | |
return mul * amp - self.target_vector | |
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4): | |
'''Use Least Square method from scipy to optimize the factors. | |
Parameters | |
---------- | |
Refer to `Suite.filter_optimize`. | |
Returns | |
------- | |
`suite` : `Suite` | |
Optimized suite. | |
''' | |
print('Running Bounded Least Square optimization ...') | |
res = least_squares(self._loss, | |
np.ones(len(self.motions)), | |
bounds=(lower_bound, upper_bound), | |
verbose=0) | |
factors = res.x * self.factors | |
suite = self.copy() | |
suite.name = new_name | |
suite.set_folder(new_folder) | |
suite.factors = factors.tolist() | |
suite.eliminate_neg() | |
return suite | |
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False): | |
'''From a folder load all the PEER motions into the suite. | |
Extract the downloaded peer zip file first. | |
Parameters | |
---------- | |
`peer_folder` : str | |
The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found. | |
`h1`, `h2`, `v` : boolean | |
Whether load the three directional files or not. | |
Outputs | |
------- | |
`Suite.motions`, `Suite.factors` will be changed. | |
All the PEER motions will be saved in .motion format in the new folder. | |
''' | |
with open(f'{peer_folder}/_SearchResults.csv', "r") as f: | |
for _ in range(34): | |
f.readline() | |
i = 0 | |
positions = [] | |
if h1: | |
positions.append(19) | |
if h2: | |
positions.append(20) | |
if v: | |
positions.append(21) | |
line = f.readline() | |
while line != '\n': | |
cells = line.split(',') | |
for pos in positions: | |
filename = cells[pos].replace(' ', '') | |
motion_name = filename.replace('.AT2', '') | |
print(f'Loading {motion_name}') | |
mo = Motion(folder=self.folder, name=motion_name) | |
mo.load_peer(peer_filename=f'{peer_folder}/{filename}') | |
mo.generate_spectrum() | |
mo.save() | |
self.add_motion(mo) | |
i += 1 | |
line = f.readline() | |
self.reset_spectrum_matrix() | |
print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.") | |
return True | |
def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True): | |
'''Beautify the suite in 4 steps: | |
1. truncate the motions, see `Motion.truncate` | |
2. trim the near-zero parts, see `Motion.trim` | |
3. change dt to uniform, see `Motion.change_dt` | |
4. rename the motions to a series. | |
This procedure is always logged to the current folder. | |
Parameters | |
---------- | |
`new_name` : str | |
`run_steps` : List<int> | |
The step number to run. Users can select which one out of the 4 steps to run. | |
The order will also follow this List. | |
`dt` : float | |
The uniform delta t. | |
`new_folder` : str | |
`trunc_dict` : dict | |
A dict that defines the `time` parameter in `Motion.truncate` | |
In the dict, keys are the indexes of the motions in the suite, values are the `time` values. | |
`left`, `right` : float | |
The parameters of PGA to trim. Refer to `Motion.trim`. | |
Returns | |
------- | |
`suite` : `Suite` | |
Outputs | |
------- | |
All the logs and the pre- and post- beautify average spectrum. | |
''' | |
suite = self.copy() | |
spectrum_hist = suite.get_average_spectrum() | |
suite.plot_all_spectrums(engine='pyplot') | |
for i, motion in enumerate(self.motions): | |
mo = motion.copy() | |
mo.name = f'{self.name}-{i}-b' | |
for step in run_steps: | |
if step == 1: | |
if i in trunc_dict: | |
mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1') | |
elif step == 2: | |
mo = mo.trim(new_name=mo.name+'2', left=left, right=right) | |
elif step == 3: | |
mo = mo.change_dt(dt=dt, new_name=mo.name+'3') | |
elif step == 4: | |
mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}' | |
else: | |
print(f'step={step} is not recognized and ignored.') | |
suite.motions[i] = mo | |
spectrum_new = suite.get_average_spectrum() | |
plt.figure() | |
plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before') | |
plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after') | |
plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target') | |
plt.legend() | |
plt.suptitle(f'Beautify: {self.name} -> {new_name}') | |
plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200) | |
suite.name = new_name | |
if new_folder is not None: | |
suite.set_folder(new_folder) | |
return suite | |
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True): | |
'''Scale the motions both horizontally and vertically. | |
Parameters | |
---------- | |
`new_name` : str | |
`new_folder` : str, None | |
`factor_x`, `factor_y` : float | |
The factors to amplify in x axis and y axis. | |
`scale_target_spectrum` : boolean | |
Whether scale the target spectrum as well. | |
Returns | |
------- | |
`suite` : `Suite` | |
''' | |
suite = self.copy() | |
suite.name = new_name | |
suite.set_folder(new_folder) | |
for i, motion in enumerate(self.motions): | |
suite.motions[i] = motion.scale( | |
new_name=motion.name, factor_x=factor_x, factor_y=factor_y) | |
if scale_target_spectrum: | |
suite.target_spectrum = self.target_spectrum.scale( | |
factor_x=factor_x, factor_y=factor_y) | |
if factor_x != 1: | |
suite.period_bound = [p * factor_x for p in self.period_bound] | |
suite.period_samples = self.period_samples * factor_x | |
if factor_y != 1: | |
suite.factors = [f*factor_y for f in self.factors] | |
suite.reset_spectrum_matrix() | |
return suite | |
def write_TCL(self, folder=None, filename=None): | |
'''Write the suite to TCL forms for OpenSees. | |
Currently the TCL contains only one dict. | |
The keys of the dict is the motion names. Then the essential properties are stored. | |
At the same time, the accelerations for each motion is written to a text file. | |
Parameters | |
---------- | |
`folder` : str, None | |
The folder to write the tcl files. | |
If is None, use `Suite.folder`. | |
`filename` : str, None | |
The tcl file name. If is None, use `Suite.name`. | |
Outputs | |
------- | |
A tcl file and a series of acceleration files. | |
''' | |
folder = self.folder if folder is None else folder | |
filename = self.name if filename is None else filename.replace( | |
'.tcl', '') + '.tcl' | |
os.makedirs(folder, exist_ok=True) | |
config = '' | |
config += 'dict set motion keys {' | |
names = [mo.name for mo in self.motions] | |
config += ' '.join(names) | |
config += '}\n\n' | |
for i, mo in enumerate(self.motions): | |
mo.write_accel(f'{folder}/{mo.name}', time=False) | |
config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n' | |
config += f'dict set motion {mo.name} npts {len(mo.accel)}\n' | |
config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n' | |
config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n' | |
with open(f'{folder}/{filename}.tcl', 'w') as f: | |
f.write(config) | |
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'): | |
'''Plot amplified accelograms to a single file. | |
Parameters | |
---------- | |
`folder` : str, None | |
If is None, use `Suite.folder`. | |
`filename` : str, None | |
If is None, use `Suite.name` + '_accel' | |
`save` : boolean | |
If True, save the file. Else, show immediately. | |
`engine` : 'pyplot' or 'plotly' | |
Outputs | |
------- | |
A figure is plotted. | |
''' | |
folder = self.folder if folder is None else folder | |
if engine == 'plotly': | |
filename = f'{self.name}_accel.html' if filename is None else filename + '.html' | |
fig = go.Figure() | |
for i, motion in enumerate(self.motions): | |
fig.add_trace(go.Scatter( | |
x=np.arange(len(motion.accels)) * motion.dt, | |
y=motion.accels, | |
mode='lines', | |
name=str(i) + motion.name, | |
text=motion.name | |
)) | |
if save: | |
fig.write_html(f'{folder}/{filename}') | |
else: | |
fig.show() | |
elif engine == 'pyplot': | |
print('Pyplot is not supported.') | |
else: | |
print('ERROR: engine should be in "pyplot" and "plotly"') | |
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'): | |
'''Plot all amplified spectrums in one file. | |
Parameters | |
---------- | |
Refer to `Suite.plot_all_accels`. | |
The filename if is None will be `Suite.name` + '_spectrum' | |
Outputs | |
------- | |
A figure will be generated. | |
''' | |
if engine == 'plotly': | |
folder = self.folder if folder is None else folder | |
filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html' | |
fig = go.Figure() | |
for i, motion in enumerate(self.motions): | |
fig.add_trace(go.Scatter( | |
x=motion.spectrum.periods, | |
y=motion.spectrum.spectrum * self.factors[i], | |
mode='lines', | |
name=motion.name, | |
text=motion.name | |
)) | |
spectrum1 = self.target_spectrum | |
spectrum2 = self.get_average_spectrum() | |
fig.add_trace(go.Scatter( | |
x=spectrum1.periods, | |
y=spectrum1.spectrum, | |
mode='lines', | |
name='Target', | |
text='Target', | |
line=dict(width=5), | |
)) | |
fig.add_trace(go.Scatter( | |
x=spectrum2.periods, | |
y=spectrum2.spectrum, | |
mode='lines', | |
name='Average', | |
text='Average', | |
line=dict(width=5), | |
)) | |
if save: | |
fig.write_html(f'{folder}/{filename}') | |
else: | |
fig.show() | |
elif engine == 'pyplot': | |
folder = self.folder if folder is None else folder | |
filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png' | |
plt.figure(figsize=(7, 5)) | |
for i, motion in enumerate(self.motions): | |
plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver') | |
plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3) | |
averageSpectrum = self.get_average_spectrum() | |
plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2) | |
plt.legend(loc='best', fontsize='x-small') | |
plt.title(f'SRSS={self.get_srss():.4f}') | |
if save: | |
plt.savefig(os.path.join(folder, filename), dpi=200) | |
else: | |
plt.show() | |
else: | |
print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.") | |
def plot_individual(self, save=True): | |
'''Plot all individual ground motions and the spectrum. | |
Parameters | |
---------- | |
Refer to `Suite.plot_all_accels`. | |
Outputs | |
------- | |
If save, a figure for each motion will be generated. | |
''' | |
for i, mo in enumerate(self.motions): | |
_, ax = plt.subplots(2, 1, figsize=(7, 5)) | |
ax[0].plot(np.arange(len(mo.accel)) * mo.dt, | |
mo.accel * self.factors[i]) | |
ax[1].plot(mo.spectrum.periods, | |
mo.spectrum.spectrum * self.factors[i]) | |
ax[1].plot(self.target_spectrum.periods, | |
self.target_spectrum.spectrum, linewidth=2) | |
plt.suptitle( | |
f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}') | |
filename = f'{self.folder}/motion{i}.png' | |
if save: | |
plt.savefig(filename, dpi=200) | |
else: | |
plt.show() | |
def plot_interactive(self, save=True): | |
'''Plot the spectrum and the acceleration time-histories to an interactive html file. | |
Parameters | |
---------- | |
`save` : boolean | |
If True, write to html file. Otherwise, show directly without saving. | |
Outputs | |
------- | |
A html file with interactive plotting sliders. | |
''' | |
fig = make_subplots(rows=1, cols=2) | |
for i in range(len(self.motions)): | |
fig.add_trace(go.Scatter( | |
x=self.motions[i].spectrum.periods, | |
y=self.motions[i].spectrum.spectrum * self.factors[i], | |
name=self.motions[i].name, | |
), row=1, col=1) | |
fig.add_trace(go.Scatter( | |
x=self.motions[i].get_times(), | |
y=self.motions[i].accel * self.factors[i], | |
name=self.motions[i].name, | |
), row=1, col=2) | |
spectrum_average = self.get_average_spectrum() | |
fig.add_trace(go.Scatter( | |
x=spectrum_average.periods, | |
y=spectrum_average.spectrum, | |
name='average', | |
line={'width': 5} | |
)) | |
fig.add_trace(go.Scatter( | |
x=self.target_spectrum.periods, | |
y=self.target_spectrum.spectrum, | |
name='target', | |
line={'width': 5} | |
)) | |
def _title(i): | |
return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}' | |
steps = [] | |
for i in range(len(self.motions)): | |
step = dict( | |
method='update', | |
args=[ | |
{'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]}, | |
{'title': _title(i)} | |
], | |
label=str(i), | |
) | |
steps.append(step) | |
sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})] | |
updatemenus = [dict( | |
buttons=[dict( | |
label='All', | |
method='update', | |
args=[ | |
{'visible': [True] * (len(self.motions)*2+2)}, | |
{'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'} | |
] | |
)], | |
type='buttons', | |
x=1, | |
y=0, | |
xanchor='right', | |
yanchor='top', | |
pad={'t': 30}, | |
)] | |
fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus) | |
if save: | |
fig.write_html(f'{self.folder}/{self.name}.html') | |
else: | |
fig.show() | |
def to_dict(self, separate_motion=True): | |
'''Convert all the properties to dict. | |
Parameters | |
---------- | |
`separate_motion` : boolean | |
If True, the motion details are not included into the dict. | |
User has to make sure these motions are not moved. | |
Returns | |
------- | |
`it` : dict | |
''' | |
return dict( | |
folder=self.folder, | |
name=self.name, | |
motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [ | |
f'{mo.folder}/{mo.name}' for mo in self.motions], | |
factors=self.factors if type(self.factors[0] == float) else [ | |
fact.item() for fact in self.factors], | |
period_bound=self.period_bound, | |
target_spectrum=self.target_spectrum.to_dict(), | |
period_samples=self.period_samples.tolist(), | |
srss=self.get_srss(), | |
average_spectrum=self.get_average_spectrum().to_dict(), | |
) | |
def to_json(self, separate_motion=True): | |
'''Convert the properties to json. | |
Parameters | |
---------- | |
Refer to `Suite.to_dict`. | |
Returns | |
------- | |
`json` : str | |
''' | |
return json.dumps(self.to_dict(separate_motion=separate_motion)) | |
def save(self, separate_motion=True, rewrite=False): | |
'''Save the suite to a .suite file. | |
Parameters | |
---------- | |
`separate_motion` : boolean | |
If True, the motion details are not included into the file. | |
User has to make sure the motions are not moved. | |
`rewrite` : boolean | |
If True, all the '.motion' files will be rewritten. | |
otherwise, the motion files will be ignored. | |
Outputs | |
------- | |
Create a .suite file to store the suite. | |
''' | |
with open(f'{self.folder}/{self.name}.suite', 'w') as f: | |
f.write(self.to_json(separate_motion=separate_motion)) | |
if separate_motion: | |
for mo in self.motions: | |
if not os.path.exists(f'{mo.folder}/{mo.name}.motion'): | |
mo.save() | |
print(f'Motion saved to {mo.folder}/{mo.name}.motion') | |
else: | |
if rewrite: | |
mo.save() | |
print(f'Motion {mo.folder}/{mo.name}.motion is written.') | |
print(f'Saved to {self.folder}/{self.name}.suite') | |
@staticmethod | |
def read_dict(it): | |
'''Read the dictionary representing the properties of the class. | |
If key 'motions' is a instance of dict, then construct motions with dict. | |
else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file. | |
Parameters | |
---------- | |
`it` : dict | |
The dict generated by `Suite.to_dict` | |
Returns | |
------- | |
`suite` : `Suite` | |
''' | |
suite = Suite( | |
folder=it['folder'], | |
name=it['name'], | |
target_spectrum=Spectrum.read_dict(it['target_spectrum']), | |
period_bound=it['period_bound'] | |
) | |
for i, mo_content in enumerate(it['motions']): | |
if isinstance(mo_content, dict): | |
suite.add_motion(Motion.read_dict( | |
mo_content), it['factors'][i]) | |
else: | |
suite.add_motion(Motion.load( | |
f'{mo_content}.motion'), it['factors'][i]) | |
suite.period_samples = np.array(it['period_samples']) | |
suite.target_vector = suite.target_spectrum.get(suite.period_samples) | |
suite.reset_spectrum_matrix() | |
return suite | |
@staticmethod | |
def load(filename): | |
'''Load a the suite from a file. | |
Parameters | |
---------- | |
`filename` : str | |
File that stores the suite. | |
Returns | |
------- | |
`suite` : `Suite` | |
''' | |
if not filename.endswith('.suite'): | |
filename += '.suite' | |
with open(filename, 'r') as f: | |
return Suite.read_dict(json.loads(f.read())) | |
def copy(self): | |
'''Copy the instance. | |
Returns | |
------- | |
`suite` : `Suite` | |
''' | |
return Suite.read_dict(self.to_dict()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment