Last active
November 28, 2023 00:42
-
-
Save calum-chamberlain/7bb51596909473d579b5252f022e5bda to your computer and use it in GitHub Desktop.
Simple benchmarking for EQcorrscan.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple benchmark of matched-filter in EQcorrscan for PR #544 | |
""" | |
import boto3 | |
import os | |
import logging | |
import pickle | |
from obspy import UTCDateTime | |
from obspy.clients.fdsn import Client | |
from obsplus import WaveBank | |
from botocore import UNSIGNED | |
from botocore.config import Config | |
from eqcorrscan import Tribe, Party | |
from eqcorrscan.utils.catalog_utils import filter_picks | |
Logger = logging.getLogger("BenchLogger") | |
TRIBE_FILE = "benchmark_tribe.pkl" | |
WAVEFORM_DIR = "benchmark_waveforms" | |
STARTTIME = UTCDateTime(2023, 3, 17) | |
ENDTIME = UTCDateTime(2023, 3, 27) | |
STATIONS = ['OPRZ', 'MARZ', 'URZ', 'TARZ', 'OMRZ', | |
'MKRZ', 'LIRZ', 'MWZ', 'RUGZ', 'RRRZ', | |
'KARZ', 'MRHZ', 'PRRZ', 'HLRZ', 'RTZ', | |
'ALRZ', 'RAGZ', 'KAFS', 'SNGZ', 'EDRZ', | |
'HRRZ', 'RAHZ', 'TGRZ', 'TKGZ', 'MTHZ', | |
'HAZ', 'WHRZ', 'KMRZ', 'NGRZ', 'BKZ'] | |
def get_geonet_data(starttime, endtime, stations, outdir): | |
GEONET_AWS = "geonet-open-data" | |
DAY_STRUCT = "waveforms/miniseed/{date.year}/{date.year}.{date.julday:03d}" | |
CHAN_STRUCT = ("{station}.{network}/{date.year}.{date.julday:03d}." | |
"{station}.{location}-{channel}.{network}.D") | |
if not os.path.isdir(outdir): | |
os.makedirs(outdir) | |
bob = boto3.resource('s3', config=Config(signature_version=UNSIGNED)) | |
s3 = bob.Bucket(GEONET_AWS) | |
date = starttime | |
while date < endtime: | |
day_path = DAY_STRUCT.format(date=date) | |
for station in stations: | |
for instrument in "HE": | |
for component in "ZNE12": | |
channel = f"{instrument}H{component}" | |
chan_path = CHAN_STRUCT.format( | |
station=station, network="NZ", | |
date=date, location="10", channel=channel) | |
local_path = os.path.join(outdir, chan_path) | |
if os.path.isfile(local_path): | |
Logger.info(f"Skipping {local_path}: exists") | |
continue | |
os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
remote = "/".join([day_path, chan_path]) | |
Logger.debug(f"Downloading from {remote}") | |
try: | |
s3.download_file(remote, local_path) | |
except Exception as e: | |
Logger.debug(f"Could not download {remote} due to {e}") | |
continue | |
Logger.info(f"Downloaded {remote}") | |
date += 86400 | |
def set_up(local_waveforms: bool = True): | |
if local_waveforms: | |
Logger.info(f"Downloading waveforms between {STARTTIME} and {ENDTIME}") | |
get_geonet_data(starttime=STARTTIME, endtime=ENDTIME + 86400, # Get some extra data | |
stations=STATIONS, outdir=WAVEFORM_DIR) | |
bank = WaveBank(WAVEFORM_DIR) | |
else: | |
try: | |
from cjc_utilities.get_data.geonet_aws_client import AWSClient | |
except ModuleNotFoundError: | |
Logger.error("Did not find cjc-utilities, please install") | |
raise ModuleNotFoundError("Install cjc-utilities") | |
bank = AWSClient() | |
client = Client("GEONET") | |
Logger.info("Downloading events") | |
cat = client.get_events( | |
starttime=STARTTIME, | |
endtime=ENDTIME, | |
latitude=-38.05, longitude=176.73, | |
maxradius=0.5) | |
Logger.info(f"Downloaded {len(cat)} events") | |
cat = filter_picks( | |
cat, | |
stations=STATIONS, | |
phase_hints=["P", "S"], | |
enforce_single_pick="earliest") | |
Logger.info("Constructing tribe") | |
tribe = Tribe().construct( | |
method="from_client", | |
client_id=bank, | |
catalog=cat, | |
lowcut=2.0, | |
highcut=15.0, | |
samp_rate=50.0, | |
filt_order=4, | |
length=3.0, | |
prepick=0.5, | |
swin="all", | |
process_len=86400, | |
all_horix=True, | |
min_snr=4.0, | |
parallel=True) | |
tribe.templates = [t for t in tribe if len({tr.stats.station for tr in t.st}) >= 5] | |
Logger.info(f"Constructed tribe of {len(tribe)} templates") | |
with open(TRIBE_FILE, "wb") as f: | |
pickle.dump(tribe, f) | |
def bench(gpu: bool = False, | |
concurrent_processing: bool = True, | |
group_size: int = None, | |
local_waveforms: bool = False, | |
): | |
Logger.info("Reading tribe from disk") | |
with open(TRIBE_FILE, "rb") as f: | |
tribe = pickle.load(f) | |
Logger.info(f"Read in tribe of {len(tribe)} templates") | |
if local_waveforms: | |
Logger.info(f"Downloading waveforms between {STARTTIME} and {ENDTIME}") | |
get_geonet_data(starttime=STARTTIME, endtime=ENDTIME + 86400, # Get some extra data | |
stations=STATIONS, outdir=WAVEFORM_DIR) | |
bank = WaveBank(WAVEFORM_DIR) | |
else: | |
try: | |
from cjc_utilities.get_data.geonet_aws_client import AWSClient | |
except ModuleNotFoundError: | |
Logger.error("Did not find cjc-utilities, please install") | |
raise ModuleNotFoundError("Install cjc-utilities") | |
bank = AWSClient() | |
xcorr_func = "fftw" | |
if gpu: | |
xcorr_func = "fmf" | |
party = tribe.client_detect( | |
client=bank, starttime=STARTTIME, | |
endtime=ENDTIME, threshold=10, | |
threshold_type="MAD", trig_int=1.0, | |
xcorr_func=xcorr_func, | |
group_size=group_size, | |
concurrent_processing=concurrent_processing, | |
) | |
if __name__ == "__main__": | |
import argparse | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s") | |
parser = argparse.ArgumentParser(description="Benchmark eqcorrscan") | |
parser.add_argument("-s", "--setup", | |
action="store_true", help="Set up before bench marking") | |
parser.add_argument("-c", "--concurrent", | |
action="store_true", help="Use concurrent processing") | |
parser.add_argument("--group-size", type=int, default=None, help="group size to use") | |
parser.add_argument("-g", "--gpu", | |
action="store_true", help="Use FMF GPU code if available") | |
parser.add_argument("-l", "--local-waveforms", action="store_true", | |
help="Pre-download waveforms to simulate local data") | |
args = parser.parse_args() | |
if args.setup: | |
set_up(local_waveforms=args.local_waveforms) | |
else: | |
bench(gpu=args.gpu, concurrent_processing=args.concurrent, | |
group_size=args.group_size, local_waveforms=args.local_waveforms) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment