Skip to content

Instantly share code, notes, and snippets.

@Hanlin-Dong
Created April 9, 2020 20:56
Show Gist options
  • Save Hanlin-Dong/ef65ff3e2c9defefe931bab68df9fb99 to your computer and use it in GitHub Desktop.
Save Hanlin-Dong/ef65ff3e2c9defefe931bab68df9fb99 to your computer and use it in GitHub Desktop.
pyearthquake.py
#!/usr/bin/env python
'''
Introduction
============
`pyearthquake` is a light-weight module with basic earthquake engineering utilities.
The aim of this module is to help engineers play with earthquakes by python programming.
*Features*
--------
1. Convert .AT2 file downloaded from PEER database to nicely aligned form.
2. Select and match ground motions to the target spectrum with Monte Carlo simulation and Least Square optimization.
3. Beautify the ground motions by truncating, trimming, changing dt, and renaming.
4. Store a suite of motions in a single file or separate files.
5. Generate displacement spectrums and pesudo acceleration spectrums.
6. Inspect a suite or a motion with nice html interactive plots, thanks to [plotly](https://plot.ly).
7. Create Code Spectrums. Currently support Chinese Code.
Fact Sheet
----------
Version: 1.0
Author: [Hanlin Dong](http://www.hanlindong.com)
License: MIT
Last Update: 2020-04-08
Content Briefing
----------------
Currently, it has three main classes: `Spectrum`, `Motion`, and `Suite`
`Spectrum` : a class storing seismic spectrum.
Some static methods are created confining to Code Spectrums.
`Motion` : a class storing acceleration timehistory.
Motion has utility functions to nicely parse PEER motion file.
Motion also generates spectrums.
`Suite` : a class storing a suite of motions and their target spectrum.
Suite has utilities to load a PEER file folder.
Suite has optimization functions to select a bunch of earthquakes out of a large number of earthquakes
with Monte Carlo simulation and bounded least square optimization.
The advantage is that the shape of the accels is not changed unlike wavelet methods,
but the average match result is still good.
Suite can write all the acceleration files seperately or integrally.
Suite can plot all the accelerations and spectrums interactively for users to inspect.
Suite can generate .tcl file to be used in OpenSees.
All the classes mentioned above can be stored as string file.
Currently they are json files with ext: .spectrum, .motion, .suite.
These files can be read by static methods in every class.
Change log:
-----------
2020-04-08 13:53:32 v1.0
- Join the three classes to one single file, refactor some functions.
- Unify some methods, and create good documentation.
- Add plotly interactive plots.
'''
import os
import re
import json
import random
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.signal import lsim
from scipy.interpolate import interp1d
from scipy.optimize import least_squares
class Spectrum():
'''A class storing a spectrum.
It can be init-ed simply with (periods, spectrum) sampling points.
It can also be init-ed by static functions to generate Code-confined spectrums.
Parameters
----------
`periods` : List, numpy.ndarray
List of sampling periods.
It can be generated by the static function `Spectrum.periods_standard` .
Otherwise, users can specify desired period sampling points.
`spectrum` : List, numpy.ndarray
List of the spectrum points corresponding to sampling periods.
'''
def __init__(self, periods, spectrum):
self.periods = np.array(periods)
'''numpy.ndarray, sampling periods'''
self.spectrum = np.array(spectrum)
'''numpy.ndarray, spectrum corresponding to sampling periods.'''
def match(self, target, start, end, step=0.1):
'''Match the current spectrum with a target spectrum. Use Least Square method.
Parameters
----------
`target` : `Spectrum`
the spectrum to be matched.
`start`, `end` : float
the matching period boundaries.
`step` : float
evenly sample the periods with the step.
Returns
-------
`coefficient` : float
the coefficient to match the current spectrum to the target.
`srss` : float
the srss after match.
'''
periods = np.arange(start, end+step, step)
this_spectrum = np.array([self.get(p) for p in periods])
that_spectrum = np.array([target.get(p) for p in periods])
coefficient = sum(this_spectrum * that_spectrum) / sum(this_spectrum ** 2)
srss = (np.sum((this_spectrum * coefficient - that_spectrum) ** 2)) ** 0.5
return coefficient, srss
def get(self, period):
'''Get the spectrum value. Either a list or a number is accepted.
If the period exceeds the largest sampling period, the last spectrum value is returned.
The spectrum value not on sampling points are linearly interpolated.
Parameters
----------
`period` : number, list, numpy.ndarray
The interested period.
Returns
-------
`spectrum` : number, numpy.ndarray
if period is list-like, return a ndarray.
if period is a number, return a number.
'''
if not hasattr(period, '__iter__'):
i = self.periods.searchsorted(period)
if self.periods[i] == period:
return self.spectrum[i]
elif i >= len(self.periods):
print("Warning: given period exceeds the sampling range.")
return self.spectrum[-1]
else:
return self.spectrum[i] - (self.spectrum[i]-self.spectrum[i-1]) / (self.periods[i] - self.periods[i-1]) * (self.periods[i] - period)
else:
return np.array([self.get(p) for p in period])
def scale(self, factor_x=1, factor_y=1):
'''Scale the spectrum.
Parameters
----------
`factor_x` : number
scaling factor in x direction (periods)
`factor_y` : number
scaling factor in y direction (spectrum)
Returns
-------
`spectrum` : `Spectrum`
'''
return Spectrum(periods=self.periods*factor_x, spectrum=self.spectrum*factor_y)
def plot(self, filename=None, engine='pyplot'):
'''Plot the spectrum.
Parameters
----------
`filename` : string, None
if None, show it. Else, save it to the filename
`engine` : 'pyplot' or 'plotly'
Plot engine.
Outputs
-------
A file with name `filename`.png or `filename`.html will be generated.
'''
if engine == 'pyplot':
plt.figure(figsize=(4, 3))
plt.plot(self.periods, self.spectrum, marker='o')
plt.xlabel('Period (s)')
plt.ylabel('Spetrum')
plt.xlim([0, self.periods[-1]])
if filename is None:
plt.show()
else:
plt.savefig(filename+'.png', dpi=200)
elif engine == 'plotly':
fig = go.Figure()
fig.add_scatter(dict(
x='self.periods',
y='self.spectrum',
mode='lines+markers',
))
if filename is None:
fig.show()
else:
fig.write_html(filename+'.html')
else:
print('ERROR: engine should be in "pyplot" or "plotly"')
def to_dict(self):
'''Convert the instance properties to a dict.
Returns
-------
`it` : dict
'''
return dict(
periods=self.periods.tolist(),
spectrum=self.spectrum.tolist()
)
def to_json(self):
'''Convert the properties to json.
Returns
-------
`json` : string
'''
return json.dumps(self.to_dict())
def save(self, filename):
'''Save the spectrum to a file. Currently use json form.
Parameters
----------
`filename` : string
The file name to be saved.
The extension '.spectrum' will be automatically added.
Outputs
-------
A file with name `filename`.spectrum will be generated.
'''
with open(filename + '.spectrum', 'w') as f:
f.write(self.to_json())
def write_csv(self, filename):
'''Write the spectrum to a csv file.
Parameters
----------
`filename` : string
The extension '.csv' will be automatically added.
Outputs
-------
A file named `filename`.csv will be generated.
'''
np.savetxt(filename+'.csv', np.vstack(
[self.periods, self.spectrum]).T, fmt="%.3f,%.7f", header='Period,Spectrum')
@staticmethod
def read_csv(filename):
'''Create an instance by reading the written csv file.
Parameters
----------
`filename` : string
The filename to read.
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
if not filename.endswith('.csv'):
filename += '.csv'
data = np.loadtxt(filename, delimiter=',')
return Spectrum(periods=data[:, 0], spectrum=data[:, 1])
@staticmethod
def read_dict(it):
'''Create an instance from the dict.
Parameters
----------
`it` : dict
The dict generated by `Spectrum.to_dict`
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
return Spectrum(periods=it['periods'], spectrum=it['spectrum'])
@staticmethod
def read_json(string):
'''Create an instance from json string.
Parameters
----------
`string` : str
The json string
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
it = json.loads(string)
return Spectrum.read_dict(it)
@staticmethod
def load(filename):
'''Load the spectrum from file.
Parameters
----------
`filename` : str
Returns
-------
`spectrum` : `Spectrum`
New instance.
'''
if not filename.endswith('.spectrum'):
filename += '.spectrum'
with open(filename, 'r') as f:
string = f.read()
return Spectrum.read_json(string)
@staticmethod
def periods_standard(until=6):
'''Get a standard sampling of the periods.
The sampling points are more sparse with the increase of period.
Parameters
----------
`until` : number
The end of the period sampling list. It should >= 4.0.
Returns
-------
`periods` : numpy.ndarray
The standard sampling points.
'''
assert until >= 4
return np.hstack([np.arange(0, 0.2, 0.01),
np.arange(0.2, 0.5, 0.02),
np.arange(0.5, 1.0, 0.05),
np.arange(1.0, 2.0, 0.1),
np.arange(2.0, 4.0, 0.2),
np.arange(4.0, until+0.5, 0.5)])
@staticmethod
def chinese_point(period, intensity, major, site, group, damping=0.05):
'''Get spectrum point for Chinese code, given a period.
Parameters
----------
`period` : number
The given period.
`intensity` : number
Intensity represented by a number.
Use 7.5 or 8.5 for half-level intensity.
`major` : boolean
If True, it's for major (rare) earthquake.
If False, for minor (frequent) earthquake.
`site` : int
Site number (Roman number in the code. 0 for I0, 1 for I1.)
`group` : int
Site class (Chinese number in the code.)
`damping` : float
Spectrum damping.
Returns
-------
`value` : float
the specific spectrum point value corresponding to the period.
'''
# alpha max
if intensity == 6:
alphamax = 0.28 if major else 0.04
elif intensity == 7:
alphamax = 0.50 if major else 0.08
elif intensity == 7.5:
alphamax = 0.72 if major else 0.12
elif intensity == 8:
alphamax = 0.9 if major else 0.16
elif intensity == 8.5:
alphamax = 1.2 if major else 0.24
elif intensity == 9:
alphamax = 1.40 if major else 0.32
else:
raise("ERROR: intensity not in [6, 7, 7.5, 8, 8.5, 9]")
# tg
tg_table = [[0.20, 0.25, 0.35, 0.45, 0.65],
[0.25, 0.3, 0.4, 0.55, 0.75],
[0.3, 0.35, 0.45, 0.65, 0.9]]
assert group in [1, 2, 3] and site in [0, 1, 2, 3, 4]
tg = tg_table[group-1][site]
# other parameters
gamma = 0.9 + (0.05 - damping) / (0.3 + 6*damping)
eta1 = 0.02 + (0.05 - damping) / (4 + 32*damping)
eta2 = 1 + (0.05 - damping) / (0.08 + 1.6*damping)
# calculate
if period < 0.1:
return ((10*eta2 - 4.5) * period + 0.45) * alphamax
elif period < tg:
return eta2 * alphamax
elif period < 5*tg:
return (tg/period)**gamma * eta2 * alphamax
elif period <= 6:
return (eta2 * 0.2**gamma - eta1 * (period-5*tg)) * alphamax
else:
return (eta2 * 0.2**gamma - eta1 * (6-5*tg)) * alphamax
@staticmethod
def chinese(intensity=8, major=True, site=3, group=2, damping=0.05):
'''Return a new spectrum instance that confines to Chinese Code.
Use `Spectrum.periods_standard` to get sampling periods.
Parameters
----------
refer to `Spectrum.chinese_point`
Returns
-------
`spectrum` : `Spectrum`
'''
periods = Spectrum.periods_standard(until=6)
spectrum = [Spectrum.chinese_point(p,
intensity=intensity,
major=major,
site=site,
group=group,
damping=damping) for p in periods]
return Spectrum(periods, spectrum)
class Motion(object):
'''A class plays with ground motion. Store accel time history and accel spectrum only.
'''
def __init__(self, folder, name, damping=0.05):
'''
Parameters
----------
`folder` : str
the folder this motion is stored.
`name` : str
The motion name. The motion will be saved to '{folder}/{name}.motion'
`damping` : float
The damping ratio of the motion spectrum.
'''
self.folder = folder
'''str. See `Motion`.'''
os.makedirs(folder, exist_ok=True)
self.name = name
'''str. See `Motion`.'''
self.accel = None
'''numpy.ndarray. Acceleration time history with evenly distributed sampling points.'''
self.dt = None
'''float. Acceleration time sampling delta t.'''
self.information = ''
'''str. Store massive infomation about the motion. '''
self.spectrum = None
'''`Spectrum`. The spectrum of the motion.'''
self.damping = damping
'''float. The damping ratio of the spectrum. Note: init with 0.05.'''
def get_PGA(self):
'''
Returns
-------
`value` : float
The peak ground acceleration.
'''
return np.max(abs(self.accel))
def get_duration(self):
'''
Returns
-------
`value` : float
The duration of the motion.
'''
return (len(self.accel) - 1) * self.dt
def get_times(self):
'''
Returns
-------
`value` : numpy.ndarray
The time series of the motion
'''
return np.arange(len(self.accel)) * self.dt
def generate_spectrum(self, periods=None):
'''Use scipy LTS signal processing module to generate the spectrum.
Note that the accel spectrum is pseudo-spectrum derived from disp spectrum.
So it's only accurate when damping is small.
If `Motion.dt` is still None, which usually occurs if the PEER file is not well formated,
a blank spectrum will be returned.
Parameters
----------
`periods` : List-like, None
The period sampling points.
If is None, `Spectrum.periods_standard` will be called.
Returns
-------
`spectrum` : `Spectrum`
The generated spectrum.
Outputs
-------
`Motion.spectrum` will be changed.
'''
if periods is None:
periods = Spectrum.periods_standard()
if self.dt is not None:
print("Generating spectrum ...")
omegas = 2 * np.pi / periods[1:]
num = np.array([-1])
spectrum = np.zeros(len(periods))
spectrum[0] = self.get_PGA()
for i, omega in enumerate(omegas):
den = np.array([1, 2*self.damping*omega, omega**2])
_, y, _ = lsim((num, den), self.accel, self.get_times())
spectrum[i+1] = np.max(abs(y)) * omega ** 2
self.spectrum = Spectrum(periods, spectrum)
else:
print("Dt is not known. Assigning empty spectrum.")
self.spectrum = Spectrum(periods=[], spectrum=[])
return self.spectrum
def load_peer(self, peer_filename):
'''Load acceleration time-history files downloaded from PEER.
The strings in the file will be parsed and also stored in self.information.
Parameters
----------
`peer_filename` : str
Outputs
-------
`Motion.dt`, `Motion.accel`, `Motion.information`, `Motion.spectrum` will all be changed.
'''
self.information += f'peer_filename: {peer_filename}'
try:
f = open(peer_filename, 'r')
for _ in range(3):
self.information += f.readline()
s = f.readline()
self.information += s
re_dt = re.compile(r".*[ 0](\.[0-9]*) ?SEC")
mt = re_dt.match(s)
try:
self.dt = float(mt.group(1).replace('.', '0.'))
print(f'Extracted dt = {self.dt:.4f}')
except:
print(f'ERROR: cannot find dt from {s}')
self.dt = 0.000001
series = []
datastring = f.readlines()
for dtline in datastring:
for dt in dtline.split(" "):
if dt:
try:
series.append(float(dt.replace(".", "0.")))
except ValueError:
break
self.accel = np.array(series)
print(f'Extracted npts = {len(self.accel)}')
except IOError:
print(f'No file named {self.name} is found.')
def convert_peer(self, peer_filename):
'''A recipe to convert the peer acceleration file to the .motion file.
Parameters
----------
`peer_filename` : str
`folder` : str
the folder to save .motion file.
Outputs
-------
A .motion file will be created in `folder` with the PEER filename.
'''
print(f'Converting PEER file {peer_filename} to folder {self.folder} with name {self.name}')
self.load_peer(peer_filename)
self.generate_spectrum()
self.save()
def trim(self, new_name, new_folder=None, left=1.0e-3, right=1.0e-3, prepend_zero=True, log=True, log_folder=None):
'''Trim the motion with two coefficients of PGA.
This method is used when a long near-zero time-history is stored in the data file.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`left`, `right` : float
The coefficient of PGA to trim the time history.
Search from left or right, when the target point is found,
search for the nearest near-zero point. Then trim off the small values.
`prepend_zero` : boolean
If True, prepend a zero value to the trimmed acceleration.
`log` : boolean
If the log picture will be generated.
`log_folder` : str, None
Path for the folder to save the trim logs.
If None, save in the previous folder.
Returns
-------
`motion` : `Motion`
The trimmed motion.
Outputs
-------
If log, plot a figure in log_folder named '{new_name}_trimlog.png'
'''
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
pga = self.get_PGA()
# search left
i = 0
while abs(self.accel[i]) < pga * left:
i += 1
while i > 0 and self.accel[i] * self.accel[i-1] > 0:
i -= 1
i_left = i
# search right
i = -1
while abs(self.accel[i]) < pga * right:
i -= 1
while i < -1 and self.accel[i] * self.accel[i+1] > 0:
i += 1
i_right = len(self.accel) + i
if prepend_zero:
mo.accel = np.hstack([0, self.accel[i_left:i_right]])
else:
mo.accel = self.accel[i_left:i_right]
mo.generate_spectrum()
print(
f'Motion is trimmed from {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Trim log: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
plt.savefig(f'{log_folder}/{mo.name}_trimlog.png', dpi=200)
return mo
def change_dt(self, dt, new_name, new_folder=None, log=True, log_folder=None):
'''Change dt for the accel. Linearly interpolate the acceleration, and re-sample.
Note: no low-pass filters are used. May cause new peaks in the Fourier spectrum.
Parameters
----------
`dt` : float
New sampling delta t.
`new_name` : str
`new_folder` : str, None
`log` : boolean
If log, create a log picture comparing the before and after accels and spectrum.
`log_folder` : str, None
Path for the folder to save the trim logs.
If None, save in the previous folder.
Returns
-------
`motion` : `Motion`
Outputs
-------
If log, create a picture named '{log_folder}/{new_name}_dtlog.png'.
'''
if dt == self.dt:
print(f'Motion {self.name} do not need to change dt')
return self.copy()
else:
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
f = interp1d(self.get_times(), self.accel)
npts = np.floor(self.dt * len(self.accel) / dt)
mo.accel = f(np.arange(npts)*dt)
mo.dt = dt
mo.generate_spectrum()
print(f'Motion dt changed from {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Change dt: {self.name}:{self.dt:.4f}s to {mo.name}:{dt:.4f}s.')
plt.savefig(f'{log_folder}/{mo.name}_dtlog.png', dpi=200)
return mo
def truncate(self, time, new_name, new_folder=None, log=True, log_folder=None):
'''Truncate the acceleration.
Sometimes, a ground motion file may have two peaks or more.
So if only one peak is needed, this method can be used to skip the other peaks.
Parameters
----------
`time` : float
The accels after the time will be cut off.
`new_name` : str
`new_folder` : str, None
`log` : boolean
`log_folder` : str, None
Returns
-------
`motion` : `Motion`
Outputs
-------
If log is True, a figure named '{log_folder}/{new_name}_trunclog.png' will be created.
'''
i = round(time / self.dt)
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
if i >= len(self.accel):
print(f"WARNING: time is longer than duration. {self.name} is not truncated.")
return mo
mo.accel = self.accel[:i]
mo.generate_spectrum()
print(f'Motion truncated: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
if log:
log_folder = self.folder if log_folder is None else log_folder
os.makedirs(log_folder, exist_ok=True)
_, ax = plt.subplots(3, 1, figsize=(7, 5))
ax[0].plot(self.get_times(), self.accel, label='before')
ax[1].plot(mo.get_times(), mo.accel, label='after')
ax[2].plot(self.spectrum.periods, self.spectrum.spectrum, label='before')
ax[2].plot(mo.spectrum.periods, mo.spectrum.spectrum, label='after')
for i in range(3):
ax[i].legend()
plt.suptitle(f'Truncate: {self.name}:{len(self.accel)*self.dt:.3f}s to {mo.name}:{len(mo.accel)*mo.dt:.3f}s.')
plt.savefig(f'{log_folder}/{mo.name}_trunclog.png', dpi=200)
return mo
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1):
'''Scale the ground motion.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`factor_x` : float
The scaling factor on the time/period axis.
`factor_y` : float
The scaling factor on the accel axis.
Returns
-------
`motion` : Motion
'''
mo = self.copy()
mo.name = new_name
mo.folder = new_folder if new_folder is not None else self.folder
if factor_x != 1:
mo.dt = self.dt * factor_x
if factor_y != 1:
mo.accel = self.accel * factor_y
mo.spectrum = self.spectrum.scale(factor_x, factor_y)
return mo
def plot_timehistory(self, folder=None, save=True, engine='pyplot'):
'''Plot the time history of the acceleration.
Parameters
----------
`folder` : str, None
Store folder. If None, use `Motion.folder`
`save` : boolean
Save the figure or not.
`engine` : 'pyplot' or 'plotly'
Outputs
-------
if save is True, create a figure '{folder}/{self.name}_th'.
'''
folder = self.folder if folder is None else folder
if engine == 'pyplot':
plt.figure()
t = self.get_times()
plt.plot(t, self.accel)
if save:
plt.savefig(f'{folder}/{self.name}_th.png', dpi=200)
else:
plt.show()
elif engine == 'plotly':
fig = go.Figure()
fig.add_trace(go.Scatter(
x=self.get_times(),
y=self.accel,
mode='lines',
name=self.name
))
if save:
fig.write_html(f'{folder}/{self.name}_th.html')
else:
fig.show()
else:
print('ERROR: engine should be in "pyplot" or "plotly"')
def plot(self, folder=None, save=True, engine='pyplot'):
'''Plot the time history and the spectrum.
Parameters
----------
See `Motion.plot_timehistory`
Outputs
-------
Create a figure named '{folder}/{name}'
'''
folder = self.folder if folder is None else folder
if engine == 'pyplot':
_, ax = plt.subplots(2, 1, figsize=(7, 8))
ax[0].plot(self.get_times(), self.accel)
ax[0].set_xlabel('Time(s)')
ax[0].set_ylabel('Acceleration(g)')
ax[0].set_title(f'PGA={self.get_PGA():.4f}')
ax[1].plot(self.spectrum.periods, self.spectrum.spectrum)
ax[1].set_xlabel('Period(s)')
ax[1].set_ylabel('Spectrum Acceleration(g)')
ax[1].set_title(f'damping={self.damping:.4f}')
plt.subplots_adjust(hspace=0.3)
if save:
plt.savefig(f'{folder}/{self.name}.png', dpi=200)
else:
plt.show()
elif engine == 'plotly':
fig = make_subplots(rows=1, cols=2)
fig.add_trace(go.Scatter(
x=self.get_times(),
y=self.accel,
mode='lines',
name='accel'
), row=1, col=2)
fig.add_trace(go.Scatter(
x=self.spectrum.periods,
y=self.spectrum.spectrum,
mode='lines',
name='spectrum'
), row=1, col=1)
if save:
fig.write_html(f'{folder}/{self.name}.html')
else:
fig.show()
else:
print('Engine should be in "pyplot" or "plotly"')
def write_accel(self, filename, time=True):
'''Write the acceleration timehistory to a single file.
Parameters
----------
`filename` : str
The file path to store the accelration.
`time` : boolean
If True, a time column is created.
To know dt in the file content, time should be True.
Otherwise, dt should be stored in the filename.
Outputs
-------
An text file with acceleration information will be written.
'''
if time:
data = np.vstack([self.get_times(), self.accel])
np.savetxt(filename, data.T, fmt="%.4f,%.7e", header="Time,Accel")
else:
np.savetxt(filename, self.accel, fmt="%.7e")
def write_info(self, filename):
'''Write the information to a single file.
Parameters
----------
`filename` : str
Outputs
-------
A text file with information will be created.
'''
with open(filename, 'w') as f:
f.write(f'dt={self.dt:.3}\n')
f.write(f'npts={len(self.accel)}\n')
f.write(f'damping={self.damping:.3}\n')
f.write(self.information)
def write_spectrum(self, filename):
'''Write the spectrum to a single file. Use the form of csv.
Parameters
----------
`filename` : str
Outputs
-------
A csv file will be created.
'''
self.spectrum.write_csv(filename)
def read_accel(self, filename, dt=True):
'''Load an single-column acceleration file to the current instance.
Parameters
----------
`dt` : float, boolean
if dt is float, `Motion.dt` will be set to dt.
Else if dt is True, `Motion.dt` will be found in the file.
Outputs
-------
Change `Motion.dt` and `Motion.accel` in the CURRENT instance.
'''
if not isinstance(dt, float):
data = np.loadtxt(filename)
self.accel = data[:, 1]
self.dt = data[1, 0]
else:
self.accel = np.loadtxt(filename)
self.dt = dt
def read_spectrum(self, filename):
'''Read the spectrum from csv file.
Parameters
----------
`filename` : str
Outputs
-------
The `Motion.spectrum` of the CURRENT instance will be changed.
'''
spectrum = Spectrum.read_csv(filename)
self.spectrum = spectrum
def read_info(self, filename):
'''Read info from text file.
Parameters
----------
`filename` : str
Outputs
-------
The `Motion.information` of the CURRENT instance will be changed.
'''
with open(filename, 'r') as f:
self.dt = float(f.readline().replace("dt=", ""))
_ = int(f.readline().replace("npts=", ""))
self.damping = float(f.readline().replace("damping=", ""))
self.information = f.read()
def to_dict(self):
'''Convert the instance properties to dict.
Returns
-------
`it` : dict
'''
return dict(
dt=self.dt,
npts=len(self.accel),
damping=self.damping,
information=self.information,
accel=self.accel.tolist(),
spectrum=self.spectrum.to_dict(),
folder=self.folder,
name=self.name,
)
def to_json(self):
'''Convert the instance properties to string using json format.
Returns
-------
`string` : str
'''
return json.dumps(self.to_dict())
def save(self):
'''Save the instance to a file, which is defined by `Motion.folder` and `Motion.name`
Outputs
-------
A file named '{self.folder}/{self.name}.motion' will be created.
'''
with open(f'{self.folder}/{self.name}.motion', 'w') as f:
f.write(self.to_json())
@staticmethod
def read_dict(it):
'''Create a instance by the dict generated by `Motion.to_dict`.
Parameters
----------
`it` : dict
Returns
-------
`mo` : `Motion`
'''
mo = Motion(folder=it['folder'], name=it['name'])
mo.dt = it['dt']
mo.damping = it['damping']
mo.information = it['information']
mo.accel = np.array(it['accel'])
mo.spectrum = Spectrum.read_dict(it['spectrum'])
return mo
@staticmethod
def read_json(string):
'''Create a new instance by the string generated by `Motion.to_json`.
Parameters
----------
`string` : str
Returns
-------
`mo` : `Motion`
'''
it = json.loads(string)
return Motion.read_dict(it)
@staticmethod
def load(filename):
'''Load the file created from `Motion.save` method.
Parameters
----------
`filename` : str
Returns
-------
`mo` : `Motion`
'''
if not filename.endswith('.motion'):
filename += '.motion'
with open(filename, 'r') as f:
return Motion.read_json(f.read())
def copy(self):
'''Copy the instance.
Returns
-------
`mo` : `Motion`
'''
return Motion.read_dict(self.to_dict())
class Suite(object):
'''A class that contains mainly a series of `Motion` instances,
and corresponding series of factors, matching the suite of motions to the target `Spectrum`.
'''
def __init__(self, folder, name, target_spectrum, period_bound):
'''
Parameters
----------
`folder` : str
A path-like string for the suite.
`name` : str
A specific name for the suite. The instance will be saved in '{folder}/{name}.suite'
`target_spectrum` : `Spectrum`
The target spectrum of the suite.
`period_bound` : List<float> len=2
The matching period boundaries.
'''
self.folder = folder
'''str. See `Suite`'''
os.makedirs(folder, exist_ok=True)
self.name = name
'''str. See `Suite`'''
self.motions = []
'''List<`Motion`>. Store all the motions.'''
self.factors = []
'''List<float>. Store all the factors that should be applied to the motions.'''
self.target_spectrum = target_spectrum
'''`Spectrum`. See `Suite`'''
self.period_bound = period_bound
'''List<float>. See `Suite`'''
self.period_samples = np.linspace(period_bound[0], period_bound[1], 200)
'''numpy.ndarray. The period within the `period_bound` is evenly sampled, with 200 points.'''
self.spectrum_matrix = None
'''numpy.ndarray. A matrix with shape (len(`Suite.period_samples`), len(`Suite.motions`)).
This matrix times the `Suite.factors` vector products the average spectrum with `Suite.period_samples`.'''
self.target_vector = target_spectrum.get(self.period_samples)
'''numpy.ndarray. The vector generated by the target_spectrum sampling the `Suite.period_samples`'''
def set_folder(self, folder):
'''Reset the folder for the suite and the motions.
Parameters
----------
`folder` : str
Outputs
-------
`Suite.folder` and `Suite.motions` will be changed.
'''
if folder is not None:
self.folder = folder
os.makedirs(folder, exist_ok=True)
for mo in self.motions:
mo.folder = folder
def add_motion(self, motion, factor=1):
'''Add motion to the suite together with a factor, to keep the lengths are identical.
Parameters
----------
`motion` : `Motion`
`factor` : number
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
self.motions.append(motion)
self.factors.append(factor)
def load_motions_from_names(self, names, folder=None):
'''Give a list of name and the folder, load motions.
Parameters
----------
`names` : List<str>
The list of motion names.
`folder` : str, None
The folder of the motions.
If is None, use `Suite.folder`
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
folder = self.folder if folder is None else folder
for name in names:
mo = Motion.load(f'{folder}/{name}.motion')
self.add_motion(mo)
print(f'Loaded motion {mo.name}')
self.reset_spectrum_matrix()
def load_motions_from_folder(self, folder=None):
'''Find all the .motion file from the given folder, and load them.
Parameters
----------
`folder` : str, None
The folder to search the .motion files.
If is None, use `Suite.folder` instead.
Outputs
-------
`Suite.motions` and `Suite.factors` will be changed.
'''
folder = self.folder if folder is None else folder
filenames = os.listdir(self.folder)
for filename in filenames:
if filename.endswith('.motion'):
mo = Motion.load(f'{folder}/{filename}')
self.add_motion(mo)
print(f'Loadad motion {mo.name}')
self.reset_spectrum_matrix()
def filter_by_IDs(self, ids, new_name, new_folder=None):
'''Give a series of selected IDs, filter the motions.
Parameters
----------
`ids` : List<int>
The list of indexes of the selected motions in the current suite.
`new_name` : str
`new_folder` : str, None
Returns
-------
`suite` : `Suite`
A new suite with the selected motions and their factors only.
'''
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.motions = [self.motions[i] for i in ids]
suite.factors = [self.factors[i] for i in ids]
suite.reset_spectrum_matrix()
print(f'The motions have been filtered. {len(ids)} motions remain.')
return suite
def filter_by_file(self, filename, new_name, new_folder=None):
'''Give a file with indexes selected.
Parameters
----------
Refer to `Suite.filter_by_IDs`.
Returns
-------
Refer to `Suite.filter_by_IDs`.
'''
with open(filename, 'r') as f:
lines = f.readlines()
ids = [int(l) for l in lines]
return self.filter_by_IDs(ids=ids, new_name=new_name, new_folder=new_folder)
def get_average_spectrum(self):
'''Return a spectrum which is the average spectrum of the suite.
Outputs
-------
`spectrum` : `Spectrum`
'''
periods = self.motions[0].spectrum.periods
average_spectrum = self.motions[0].spectrum.spectrum * self.factors[0]
for i in range(1, len(self.motions)):
average_spectrum += self.motions[i].spectrum.get(periods) * self.factors[i]
average_spectrum /= len(self.motions)
return Spectrum(periods, average_spectrum)
def get_srss(self):
'''Get the srss of the current averate spectrum to the target spectrum in the period boundary.
Returns
-------
`srss` : float
'''
return np.linalg.norm(np.matmul(self.spectrum_matrix, self.factors) - self.target_vector).item()
def reset_spectrum_matrix(self):
'''Reset the spectrum matrix, which produces the average spectrum
in the boundary by multiplying factors vector.
This method should be called everytime `Suite.motions` change.
Outputs
-------
`Suite.spectrum_matrix` and `Suite.target_vector` will be changed.
'''
mat = np.zeros((len(self.period_samples), len(self.motions)))
for i in range(len(self.motions)):
spectrum = self.motions[i].spectrum.get(self.period_samples)
mat[:, i] = spectrum / len(self.motions)
self.spectrum_matrix = mat
self.target_vector = self.target_spectrum.get(self.period_samples)
def eliminate_neg(self):
'''Eliminate negative target vector by multiplying a factor for every individual.
In other words, make the average spectrum above the target spectrum.
Returns
-------
`factor` : float
The global factor that should be applied to each of the factors.
Outputs
-------
`Suite.factors` are all amplified by the returned `factor`.
'''
error = np.matmul(self.spectrum_matrix, self.factors) - self.target_vector
argmin = np.argmin(error)
factor = 1 / (1 + error[argmin] / self.target_vector[argmin])
print(f'Multiply {factor:.4} to eliminate negative spectrum.')
self.factors = [self.factors[i] * factor for i in range(len(self.factors))]
return factor
def match_individual_LSQ(self):
'''Use Least square method for each individual motion to match the spectrum.
Returns
-------
`residuals` : numpy.ndarray
srss of each motion matched to the target spectrum.
Outputs
-------
`Suite.factors` are changed.
'''
residuals = np.zeros(len(self.motions))
for i in range(len(self.motions)):
this = self.spectrum_matrix[:, i] * len(self.motions)
that = self.target_vector
factor = np.dot(this, that) / np.dot(this, this)
residual = np.linalg.norm(this * factor - that)
self.factors[i] = factor
residuals[i] = residual
print(f"Individual LSQ: Factor Max={np.max(self.factors):.4}, Residual Max={np.max(residuals):.4}")
return residuals
def filter_optimize(self, count, times, use_lsq=True, output_count=0, new_folder=None, lower_bound=0.6, upper_bound=1.4, dimension=100000):
'''Use Monte Carlo simulation and Least Square method to optimize the suite.
after running, a bunch of suites will be written to the new_folder.
Parameters
----------
`count` : int
The number of motions in the resulting suite.
`times` : int
The number of times that MC simulation with dimension is run.
The total number of samples of MC simulation will be {times} * {dimension}.
`use_lsq` : boolean
Whether use LSQ optimizer to better optimize the suite.
`output_count` : int
The number of outputs. if 0, output all.
`new_folder` : str
`lower_bound`, `upper_bound` : float
For the purpose that the individual motions are not too far from the individually matched factors,
the lower and upper boundaries of amplifying the individually matched factors are set.
This is only useful when `use_lsq` is set, to add boundaries to the LSQ optimizer.
`dimension` : int
Each time the Monte Carlo simulation is run,
means that {dimension} times of simulation have been done by using a matrix target.
Returns
-------
List<`Suite`> with len=`output_count`.
The list is sorted. The best answer is the first result. However, visual check should be done.
Outputs
-------
Every running after Monte Carlo simulation and LSQ optimization, the spectrum of the suite will be plotted.
'''
new_folder = new_folder if new_folder is not None else self.folder
os.makedirs(new_folder, exist_ok=True)
suites = []
srsses = []
for i in range(times):
suite_new = self.filter_montecarlo(count=count, new_name=f'mc{i:03d}', new_folder=new_folder)
suites.append(suite_new)
srsses.append(suite_new.get_srss())
if use_lsq:
suite_new2 = suite_new.optimize_LSQ(
new_name=f'lsq{i:03d}',
new_folder=new_folder,
lower_bound=lower_bound,
upper_bound=upper_bound
)
suites.append(suite_new2)
srsses.append(suite_new2.get_srss())
argsorted = np.argsort(np.array(srsses))
results = [suites[i] for i in argsorted[:output_count]]
for i, suite in enumerate(results):
suite.name = f'no{i}-{suite.name}'
suite.save()
suite.plot_all_spectrums(engine='pyplot')
return results
def filter_montecarlo(self, count, new_name, new_folder=None, dimension=100000):
'''Use Monte Carlo simulation to filter the suite to count numbers of motions.
Parameters
----------
Refer to `Suite.filter_optimize`.
Returns
-------
`suite` : `Suite`
The optimized result after {dimension} times of MC simulations.
'''
print(f'Running Monte Carlo simulation, dimension={dimension}')
self.match_individual_LSQ()
factors = np.array(self.factors)
xs = np.zeros((len(self.motions), dimension))
for i in range(dimension):
indexes = random.sample(range(len(self.motions)), count)
xs[indexes, i] = factors[indexes]
mul = np.matmul(self.spectrum_matrix * len(self.motions) / count, xs)
loss = mul - self.target_vector.reshape(-1, 1)
argmin = np.argmin(loss, axis=0)
lossmin = np.min(loss, axis=0)
target_at_lossmin = self.target_vector[argmin]
amps = 1 / (lossmin / target_at_lossmin + 1)
lossAmp = mul * amps - self.target_vector.reshape(-1, 1)
srss = np.linalg.norm(lossAmp, axis=0)
arg = np.argmin(srss)
factors = xs[:, arg] * amps[arg]
suvives = np.where(factors > 1.0e-3)
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.motions = [self.motions[i] for i in suvives[0]]
suite.factors = [factors[i] for i in suvives[0]]
suite.reset_spectrum_matrix()
return suite
def _loss(self, x):
'''Utility function: calculate the loss for optimization.'''
factors = x * np.array(self.factors)
mul = np.matmul(self.spectrum_matrix, factors)
loss = mul - self.target_vector
argmin = np.argmin(loss)
amp = 1 / (loss[argmin] / self.target_vector[argmin] + 1)
return mul * amp - self.target_vector
def optimize_LSQ(self, new_name, new_folder=None, lower_bound=0.6, upper_bound=1.4):
'''Use Least Square method from scipy to optimize the factors.
Parameters
----------
Refer to `Suite.filter_optimize`.
Returns
-------
`suite` : `Suite`
Optimized suite.
'''
print('Running Bounded Least Square optimization ...')
res = least_squares(self._loss,
np.ones(len(self.motions)),
bounds=(lower_bound, upper_bound),
verbose=0)
factors = res.x * self.factors
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
suite.factors = factors.tolist()
suite.eliminate_neg()
return suite
def load_peer_folder(self, peer_folder, h1=True, h2=True, v=False):
'''From a folder load all the PEER motions into the suite.
Extract the downloaded peer zip file first.
Parameters
----------
`peer_folder` : str
The folder path of the extracted PEER data. Where `_SearchResults.csv` can be found.
`h1`, `h2`, `v` : boolean
Whether load the three directional files or not.
Outputs
-------
`Suite.motions`, `Suite.factors` will be changed.
All the PEER motions will be saved in .motion format in the new folder.
'''
with open(f'{peer_folder}/_SearchResults.csv', "r") as f:
for _ in range(34):
f.readline()
i = 0
positions = []
if h1:
positions.append(19)
if h2:
positions.append(20)
if v:
positions.append(21)
line = f.readline()
while line != '\n':
cells = line.split(',')
for pos in positions:
filename = cells[pos].replace(' ', '')
motion_name = filename.replace('.AT2', '')
print(f'Loading {motion_name}')
mo = Motion(folder=self.folder, name=motion_name)
mo.load_peer(peer_filename=f'{peer_folder}/{filename}')
mo.generate_spectrum()
mo.save()
self.add_motion(mo)
i += 1
line = f.readline()
self.reset_spectrum_matrix()
print(f"Successfully loaded {i} earthquakes from peer folder {peer_folder} to {self.folder}.")
return True
def beautify(self, new_name, run_steps=[1,2,3,4], dt=0.01, new_folder=None, trunc_dict={}, left=1.0e-3, right=1.0e-3, prepend_zero=True):
'''Beautify the suite in 4 steps:
1. truncate the motions, see `Motion.truncate`
2. trim the near-zero parts, see `Motion.trim`
3. change dt to uniform, see `Motion.change_dt`
4. rename the motions to a series.
This procedure is always logged to the current folder.
Parameters
----------
`new_name` : str
`run_steps` : List<int>
The step number to run. Users can select which one out of the 4 steps to run.
The order will also follow this List.
`dt` : float
The uniform delta t.
`new_folder` : str
`trunc_dict` : dict
A dict that defines the `time` parameter in `Motion.truncate`
In the dict, keys are the indexes of the motions in the suite, values are the `time` values.
`left`, `right` : float
The parameters of PGA to trim. Refer to `Motion.trim`.
Returns
-------
`suite` : `Suite`
Outputs
-------
All the logs and the pre- and post- beautify average spectrum.
'''
suite = self.copy()
spectrum_hist = suite.get_average_spectrum()
suite.plot_all_spectrums(engine='pyplot')
for i, motion in enumerate(self.motions):
mo = motion.copy()
mo.name = f'{self.name}-{i}-b'
for step in run_steps:
if step == 1:
if i in trunc_dict:
mo = mo.truncate(time=trunc_dict[i], new_name=mo.name+'1')
elif step == 2:
mo = mo.trim(new_name=mo.name+'2', left=left, right=right)
elif step == 3:
mo = mo.change_dt(dt=dt, new_name=mo.name+'3')
elif step == 4:
mo.name = f'{i:02}' if len(self.motions) <= 100 else f'{i:03}'
else:
print(f'step={step} is not recognized and ignored.')
suite.motions[i] = mo
spectrum_new = suite.get_average_spectrum()
plt.figure()
plt.plot(spectrum_hist.periods, spectrum_hist.spectrum, label='before')
plt.plot(spectrum_new.periods, spectrum_new.spectrum, label='after')
plt.plot(suite.target_spectrum.periods, suite.target_spectrum.spectrum, label='target')
plt.legend()
plt.suptitle(f'Beautify: {self.name} -> {new_name}')
plt.savefig(f'{self.folder}/{self.name}-beautify.png', dpi=200)
suite.name = new_name
if new_folder is not None:
suite.set_folder(new_folder)
return suite
def scale(self, new_name, new_folder=None, factor_x=1, factor_y=1, scale_target_spectrum=True):
'''Scale the motions both horizontally and vertically.
Parameters
----------
`new_name` : str
`new_folder` : str, None
`factor_x`, `factor_y` : float
The factors to amplify in x axis and y axis.
`scale_target_spectrum` : boolean
Whether scale the target spectrum as well.
Returns
-------
`suite` : `Suite`
'''
suite = self.copy()
suite.name = new_name
suite.set_folder(new_folder)
for i, motion in enumerate(self.motions):
suite.motions[i] = motion.scale(
new_name=motion.name, factor_x=factor_x, factor_y=factor_y)
if scale_target_spectrum:
suite.target_spectrum = self.target_spectrum.scale(
factor_x=factor_x, factor_y=factor_y)
if factor_x != 1:
suite.period_bound = [p * factor_x for p in self.period_bound]
suite.period_samples = self.period_samples * factor_x
if factor_y != 1:
suite.factors = [f*factor_y for f in self.factors]
suite.reset_spectrum_matrix()
return suite
def write_TCL(self, folder=None, filename=None):
'''Write the suite to TCL forms for OpenSees.
Currently the TCL contains only one dict.
The keys of the dict is the motion names. Then the essential properties are stored.
At the same time, the accelerations for each motion is written to a text file.
Parameters
----------
`folder` : str, None
The folder to write the tcl files.
If is None, use `Suite.folder`.
`filename` : str, None
The tcl file name. If is None, use `Suite.name`.
Outputs
-------
A tcl file and a series of acceleration files.
'''
folder = self.folder if folder is None else folder
filename = self.name if filename is None else filename.replace(
'.tcl', '') + '.tcl'
os.makedirs(folder, exist_ok=True)
config = ''
config += 'dict set motion keys {'
names = [mo.name for mo in self.motions]
config += ' '.join(names)
config += '}\n\n'
for i, mo in enumerate(self.motions):
mo.write_accel(f'{folder}/{mo.name}', time=False)
config += f'dict set motion {mo.name} path {folder}/{mo.name}.acc\n'
config += f'dict set motion {mo.name} npts {len(mo.accel)}\n'
config += f'dict set motion {mo.name} dt {mo.dt:.4f}\n'
config += f'dict set motion {mo.name} amp {self.factors[i]:.4f}\n\n'
with open(f'{folder}/{filename}.tcl', 'w') as f:
f.write(config)
def plot_all_accels(self, folder=None, filename=None, save=True, engine='pyplot'):
'''Plot amplified accelograms to a single file.
Parameters
----------
`folder` : str, None
If is None, use `Suite.folder`.
`filename` : str, None
If is None, use `Suite.name` + '_accel'
`save` : boolean
If True, save the file. Else, show immediately.
`engine` : 'pyplot' or 'plotly'
Outputs
-------
A figure is plotted.
'''
folder = self.folder if folder is None else folder
if engine == 'plotly':
filename = f'{self.name}_accel.html' if filename is None else filename + '.html'
fig = go.Figure()
for i, motion in enumerate(self.motions):
fig.add_trace(go.Scatter(
x=np.arange(len(motion.accels)) * motion.dt,
y=motion.accels,
mode='lines',
name=str(i) + motion.name,
text=motion.name
))
if save:
fig.write_html(f'{folder}/{filename}')
else:
fig.show()
elif engine == 'pyplot':
print('Pyplot is not supported.')
else:
print('ERROR: engine should be in "pyplot" and "plotly"')
def plot_all_spectrums(self, folder=None, filename=None, save=True, engine='pyplot'):
'''Plot all amplified spectrums in one file.
Parameters
----------
Refer to `Suite.plot_all_accels`.
The filename if is None will be `Suite.name` + '_spectrum'
Outputs
-------
A figure will be generated.
'''
if engine == 'plotly':
folder = self.folder if folder is None else folder
filename = f'{self.name}_spectrum.html' if filename is None else filename + '.html'
fig = go.Figure()
for i, motion in enumerate(self.motions):
fig.add_trace(go.Scatter(
x=motion.spectrum.periods,
y=motion.spectrum.spectrum * self.factors[i],
mode='lines',
name=motion.name,
text=motion.name
))
spectrum1 = self.target_spectrum
spectrum2 = self.get_average_spectrum()
fig.add_trace(go.Scatter(
x=spectrum1.periods,
y=spectrum1.spectrum,
mode='lines',
name='Target',
text='Target',
line=dict(width=5),
))
fig.add_trace(go.Scatter(
x=spectrum2.periods,
y=spectrum2.spectrum,
mode='lines',
name='Average',
text='Average',
line=dict(width=5),
))
if save:
fig.write_html(f'{folder}/{filename}')
else:
fig.show()
elif engine == 'pyplot':
folder = self.folder if folder is None else folder
filename = f'{self.name}_spectrum.png' if filename is None else filename + '.png'
plt.figure(figsize=(7, 5))
for i, motion in enumerate(self.motions):
plt.plot(motion.spectrum.periods, motion.spectrum.spectrum * self.factors[i], label=motion.name, c='silver')
plt.plot(self.target_spectrum.periods, self.target_spectrum.spectrum, label='Target', linewidth=3)
averageSpectrum = self.get_average_spectrum()
plt.plot(averageSpectrum.periods, averageSpectrum.spectrum, label="average", linewidth=2)
plt.legend(loc='best', fontsize='x-small')
plt.title(f'SRSS={self.get_srss():.4f}')
if save:
plt.savefig(os.path.join(folder, filename), dpi=200)
else:
plt.show()
else:
print("ERROR: wrong engine name. use 'plotly' or 'pyplot'.")
def plot_individual(self, save=True):
'''Plot all individual ground motions and the spectrum.
Parameters
----------
Refer to `Suite.plot_all_accels`.
Outputs
-------
If save, a figure for each motion will be generated.
'''
for i, mo in enumerate(self.motions):
_, ax = plt.subplots(2, 1, figsize=(7, 5))
ax[0].plot(np.arange(len(mo.accel)) * mo.dt,
mo.accel * self.factors[i])
ax[1].plot(mo.spectrum.periods,
mo.spectrum.spectrum * self.factors[i])
ax[1].plot(self.target_spectrum.periods,
self.target_spectrum.spectrum, linewidth=2)
plt.suptitle(
f'{i}: Name {mo.name}\nfactor={self.factors[i]:.4}, PGA={mo.get_PGA()*self.factors[i]:.4}, dt={mo.dt:.3f}')
filename = f'{self.folder}/motion{i}.png'
if save:
plt.savefig(filename, dpi=200)
else:
plt.show()
def plot_interactive(self, save=True):
'''Plot the spectrum and the acceleration time-histories to an interactive html file.
Parameters
----------
`save` : boolean
If True, write to html file. Otherwise, show directly without saving.
Outputs
-------
A html file with interactive plotting sliders.
'''
fig = make_subplots(rows=1, cols=2)
for i in range(len(self.motions)):
fig.add_trace(go.Scatter(
x=self.motions[i].spectrum.periods,
y=self.motions[i].spectrum.spectrum * self.factors[i],
name=self.motions[i].name,
), row=1, col=1)
fig.add_trace(go.Scatter(
x=self.motions[i].get_times(),
y=self.motions[i].accel * self.factors[i],
name=self.motions[i].name,
), row=1, col=2)
spectrum_average = self.get_average_spectrum()
fig.add_trace(go.Scatter(
x=spectrum_average.periods,
y=spectrum_average.spectrum,
name='average',
line={'width': 5}
))
fig.add_trace(go.Scatter(
x=self.target_spectrum.periods,
y=self.target_spectrum.spectrum,
name='target',
line={'width': 5}
))
def _title(i):
return f'Motion #{i}: factor={self.factors[i]:.2f}, duration={len(self.motions[i].accel)*self.motions[i].dt:.2f}, PGA={self.motions[i].get_PGA()*self.factors[i]:.4}'
steps = []
for i in range(len(self.motions)):
step = dict(
method='update',
args=[
{'visible': [j//2 == i for j in range(len(self.motions)*2)] + [False, True]},
{'title': _title(i)}
],
label=str(i),
)
steps.append(step)
sliders = [dict(active=0, steps=steps, len=0.9, currentvalue={'visible': False})]
updatemenus = [dict(
buttons=[dict(
label='All',
method='update',
args=[
{'visible': [True] * (len(self.motions)*2+2)},
{'title': f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}'}
]
)],
type='buttons',
x=1,
y=0,
xanchor='right',
yanchor='top',
pad={'t': 30},
)]
fig.update_layout(sliders=sliders, title=f'Suite {self.name}: {len(self.motions)} motions, SRSS={self.get_srss():.4}', updatemenus=updatemenus)
if save:
fig.write_html(f'{self.folder}/{self.name}.html')
else:
fig.show()
def to_dict(self, separate_motion=True):
'''Convert all the properties to dict.
Parameters
----------
`separate_motion` : boolean
If True, the motion details are not included into the dict.
User has to make sure these motions are not moved.
Returns
-------
`it` : dict
'''
return dict(
folder=self.folder,
name=self.name,
motions=[mo.to_dict() for mo in self.motions] if not separate_motion else [
f'{mo.folder}/{mo.name}' for mo in self.motions],
factors=self.factors if type(self.factors[0] == float) else [
fact.item() for fact in self.factors],
period_bound=self.period_bound,
target_spectrum=self.target_spectrum.to_dict(),
period_samples=self.period_samples.tolist(),
srss=self.get_srss(),
average_spectrum=self.get_average_spectrum().to_dict(),
)
def to_json(self, separate_motion=True):
'''Convert the properties to json.
Parameters
----------
Refer to `Suite.to_dict`.
Returns
-------
`json` : str
'''
return json.dumps(self.to_dict(separate_motion=separate_motion))
def save(self, separate_motion=True, rewrite=False):
'''Save the suite to a .suite file.
Parameters
----------
`separate_motion` : boolean
If True, the motion details are not included into the file.
User has to make sure the motions are not moved.
`rewrite` : boolean
If True, all the '.motion' files will be rewritten.
otherwise, the motion files will be ignored.
Outputs
-------
Create a .suite file to store the suite.
'''
with open(f'{self.folder}/{self.name}.suite', 'w') as f:
f.write(self.to_json(separate_motion=separate_motion))
if separate_motion:
for mo in self.motions:
if not os.path.exists(f'{mo.folder}/{mo.name}.motion'):
mo.save()
print(f'Motion saved to {mo.folder}/{mo.name}.motion')
else:
if rewrite:
mo.save()
print(f'Motion {mo.folder}/{mo.name}.motion is written.')
print(f'Saved to {self.folder}/{self.name}.suite')
@staticmethod
def read_dict(it):
'''Read the dictionary representing the properties of the class.
If key 'motions' is a instance of dict, then construct motions with dict.
else, key 'motions' should be a list of {folder}/{name}, then construct with the .motion file.
Parameters
----------
`it` : dict
The dict generated by `Suite.to_dict`
Returns
-------
`suite` : `Suite`
'''
suite = Suite(
folder=it['folder'],
name=it['name'],
target_spectrum=Spectrum.read_dict(it['target_spectrum']),
period_bound=it['period_bound']
)
for i, mo_content in enumerate(it['motions']):
if isinstance(mo_content, dict):
suite.add_motion(Motion.read_dict(
mo_content), it['factors'][i])
else:
suite.add_motion(Motion.load(
f'{mo_content}.motion'), it['factors'][i])
suite.period_samples = np.array(it['period_samples'])
suite.target_vector = suite.target_spectrum.get(suite.period_samples)
suite.reset_spectrum_matrix()
return suite
@staticmethod
def load(filename):
'''Load a the suite from a file.
Parameters
----------
`filename` : str
File that stores the suite.
Returns
-------
`suite` : `Suite`
'''
if not filename.endswith('.suite'):
filename += '.suite'
with open(filename, 'r') as f:
return Suite.read_dict(json.loads(f.read()))
def copy(self):
'''Copy the instance.
Returns
-------
`suite` : `Suite`
'''
return Suite.read_dict(self.to_dict())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment