Skip to content

Instantly share code, notes, and snippets.

@calum-chamberlain
Last active November 28, 2023 00:42
Show Gist options
  • Save calum-chamberlain/7bb51596909473d579b5252f022e5bda to your computer and use it in GitHub Desktop.
Save calum-chamberlain/7bb51596909473d579b5252f022e5bda to your computer and use it in GitHub Desktop.
Simple benchmarking for EQcorrscan.
"""
Simple benchmark of matched-filter in EQcorrscan for PR #544
"""
import boto3
import os
import logging
import pickle
from obspy import UTCDateTime
from obspy.clients.fdsn import Client
from obsplus import WaveBank
from botocore import UNSIGNED
from botocore.config import Config
from eqcorrscan import Tribe, Party
from eqcorrscan.utils.catalog_utils import filter_picks
Logger = logging.getLogger("BenchLogger")
TRIBE_FILE = "benchmark_tribe.pkl"
WAVEFORM_DIR = "benchmark_waveforms"
STARTTIME = UTCDateTime(2023, 3, 17)
ENDTIME = UTCDateTime(2023, 3, 27)
STATIONS = ['OPRZ', 'MARZ', 'URZ', 'TARZ', 'OMRZ',
'MKRZ', 'LIRZ', 'MWZ', 'RUGZ', 'RRRZ',
'KARZ', 'MRHZ', 'PRRZ', 'HLRZ', 'RTZ',
'ALRZ', 'RAGZ', 'KAFS', 'SNGZ', 'EDRZ',
'HRRZ', 'RAHZ', 'TGRZ', 'TKGZ', 'MTHZ',
'HAZ', 'WHRZ', 'KMRZ', 'NGRZ', 'BKZ']
def get_geonet_data(starttime, endtime, stations, outdir):
GEONET_AWS = "geonet-open-data"
DAY_STRUCT = "waveforms/miniseed/{date.year}/{date.year}.{date.julday:03d}"
CHAN_STRUCT = ("{station}.{network}/{date.year}.{date.julday:03d}."
"{station}.{location}-{channel}.{network}.D")
if not os.path.isdir(outdir):
os.makedirs(outdir)
bob = boto3.resource('s3', config=Config(signature_version=UNSIGNED))
s3 = bob.Bucket(GEONET_AWS)
date = starttime
while date < endtime:
day_path = DAY_STRUCT.format(date=date)
for station in stations:
for instrument in "HE":
for component in "ZNE12":
channel = f"{instrument}H{component}"
chan_path = CHAN_STRUCT.format(
station=station, network="NZ",
date=date, location="10", channel=channel)
local_path = os.path.join(outdir, chan_path)
if os.path.isfile(local_path):
Logger.info(f"Skipping {local_path}: exists")
continue
os.makedirs(os.path.dirname(local_path), exist_ok=True)
remote = "/".join([day_path, chan_path])
Logger.debug(f"Downloading from {remote}")
try:
s3.download_file(remote, local_path)
except Exception as e:
Logger.debug(f"Could not download {remote} due to {e}")
continue
Logger.info(f"Downloaded {remote}")
date += 86400
def set_up(local_waveforms: bool = True):
if local_waveforms:
Logger.info(f"Downloading waveforms between {STARTTIME} and {ENDTIME}")
get_geonet_data(starttime=STARTTIME, endtime=ENDTIME + 86400, # Get some extra data
stations=STATIONS, outdir=WAVEFORM_DIR)
bank = WaveBank(WAVEFORM_DIR)
else:
try:
from cjc_utilities.get_data.geonet_aws_client import AWSClient
except ModuleNotFoundError:
Logger.error("Did not find cjc-utilities, please install")
raise ModuleNotFoundError("Install cjc-utilities")
bank = AWSClient()
client = Client("GEONET")
Logger.info("Downloading events")
cat = client.get_events(
starttime=STARTTIME,
endtime=ENDTIME,
latitude=-38.05, longitude=176.73,
maxradius=0.5)
Logger.info(f"Downloaded {len(cat)} events")
cat = filter_picks(
cat,
stations=STATIONS,
phase_hints=["P", "S"],
enforce_single_pick="earliest")
Logger.info("Constructing tribe")
tribe = Tribe().construct(
method="from_client",
client_id=bank,
catalog=cat,
lowcut=2.0,
highcut=15.0,
samp_rate=50.0,
filt_order=4,
length=3.0,
prepick=0.5,
swin="all",
process_len=86400,
all_horix=True,
min_snr=4.0,
parallel=True)
tribe.templates = [t for t in tribe if len({tr.stats.station for tr in t.st}) >= 5]
Logger.info(f"Constructed tribe of {len(tribe)} templates")
with open(TRIBE_FILE, "wb") as f:
pickle.dump(tribe, f)
def bench(gpu: bool = False,
concurrent_processing: bool = True,
group_size: int = None,
local_waveforms: bool = False,
):
Logger.info("Reading tribe from disk")
with open(TRIBE_FILE, "rb") as f:
tribe = pickle.load(f)
Logger.info(f"Read in tribe of {len(tribe)} templates")
if local_waveforms:
Logger.info(f"Downloading waveforms between {STARTTIME} and {ENDTIME}")
get_geonet_data(starttime=STARTTIME, endtime=ENDTIME + 86400, # Get some extra data
stations=STATIONS, outdir=WAVEFORM_DIR)
bank = WaveBank(WAVEFORM_DIR)
else:
try:
from cjc_utilities.get_data.geonet_aws_client import AWSClient
except ModuleNotFoundError:
Logger.error("Did not find cjc-utilities, please install")
raise ModuleNotFoundError("Install cjc-utilities")
bank = AWSClient()
xcorr_func = "fftw"
if gpu:
xcorr_func = "fmf"
party = tribe.client_detect(
client=bank, starttime=STARTTIME,
endtime=ENDTIME, threshold=10,
threshold_type="MAD", trig_int=1.0,
xcorr_func=xcorr_func,
group_size=group_size,
concurrent_processing=concurrent_processing,
)
if __name__ == "__main__":
import argparse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s")
parser = argparse.ArgumentParser(description="Benchmark eqcorrscan")
parser.add_argument("-s", "--setup",
action="store_true", help="Set up before bench marking")
parser.add_argument("-c", "--concurrent",
action="store_true", help="Use concurrent processing")
parser.add_argument("--group-size", type=int, default=None, help="group size to use")
parser.add_argument("-g", "--gpu",
action="store_true", help="Use FMF GPU code if available")
parser.add_argument("-l", "--local-waveforms", action="store_true",
help="Pre-download waveforms to simulate local data")
args = parser.parse_args()
if args.setup:
set_up(local_waveforms=args.local_waveforms)
else:
bench(gpu=args.gpu, concurrent_processing=args.concurrent,
group_size=args.group_size, local_waveforms=args.local_waveforms)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment