Last active
September 8, 2025 16:35
-
-
Save scottstanie/c5a7ade5ef2083716d91a90231d6dfbc to your computer and use it in GitHub Desktop.
Create a reference parquet file for one OPERA DISP-S1 frame
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import argparse | |
| import multiprocessing | |
| import warnings | |
| import xarray as xr | |
| from dask import compute, delayed | |
| from dask.distributed import Client | |
| from obstore.store import S3Store | |
| from virtualizarr import open_virtual_dataset | |
| from virtualizarr.parsers import HDFParser | |
| from virtualizarr.registry import ObjectStoreRegistry | |
| from opera_utils.credentials import AWSCredentials | |
| from opera_utils.disp import DispProductStack, search | |
| # ignore the specific UserWarning from numcodecs | |
| warnings.filterwarnings( | |
| "ignore", message="Numcodecs codecs are not in the Zarr version 3 specification.*" | |
| ) | |
| def create_virtual_dataset( | |
| frame_id: int, output_file: str, max_files: int | None = None | |
| ): | |
| """Build a virtual Parquet reference for an OPERA DISP-S1 frame. | |
| This function performs the following steps: | |
| 1. Searches for all DISP-S1 products for a given frame ID. | |
| 2. Obtains temporary S3 credentials for in-region access. | |
| 3. Creates an S3Store, HDFParser, and ObjectStoreRegistry as required by V2. | |
| 4. Creates individual virtual datasets for each file in parallel using Dask. | |
| 5. Combines the individual datasets into a single xarray.Dataset. | |
| 6. Saves the final combined reference to a Parquet file using the new .vz accessor. | |
| Parameters | |
| ---------- | |
| frame_id : int | |
| The OPERA frame ID to process (e.g., 44055). | |
| output_file : str | |
| The path for the output Parquet reference file. | |
| max_files : int, optional | |
| Maximum number of files to process for a quick test. Processes all by default. | |
| """ | |
| print(f"π°οΈ Searching for OPERA DISP-S1 data for frame ID: {frame_id}") | |
| # 1. Search for data and get S3 URLs using opera_utils | |
| dps = DispProductStack(search(frame_id=frame_id, url_type="s3")) | |
| if not dps: | |
| print(f"β No products found for frame ID {frame_id}.") | |
| return | |
| s3_urls = dps.filenames | |
| dates = dps.secondary_dates | |
| if max_files: | |
| print(f"Limiting to the first {max_files} files for this run.") | |
| s3_urls = s3_urls[:max_files] | |
| dates = dates[:max_files] | |
| print(f"β Found {len(s3_urls)} products.") | |
| # 2. VIRTUALIZARR V2 SETUP: Create explicit store and parser objects. | |
| # First, get temporary AWS credentials. | |
| print("π Obtaining temporary AWS credentials...") | |
| creds = AWSCredentials.from_asf() | |
| # Next, define the S3 bucket and create the S3Store object. | |
| bucket = "asf-cumulus-prod-opera-products" | |
| store = S3Store( | |
| bucket=bucket, | |
| access_key_id=creds.access_key_id, | |
| secret_access_key=creds.secret_access_key, | |
| session_token=creds.session_token, | |
| region="us-west-2", | |
| ) | |
| # Create the registry to map the S3 prefix to the store object. | |
| registry = ObjectStoreRegistry({f"s3://{bucket}": store}) | |
| # Create the HDFParser, passing loadable_variables to it directly. | |
| parser = HDFParser() | |
| print("β VirtualiZarr v2.0 components (Store, Registry, Parser) are configured.") | |
| # 3. Set up parallel processing with a local Dask cluster | |
| n_workers = max(min(32, multiprocessing.cpu_count() - 1), 1) | |
| client = Client(n_workers=n_workers, threads_per_worker=1) | |
| print( | |
| f"π Dask client started with {n_workers} workers. Dashboard: {client.dashboard_link}" | |
| ) | |
| # 4. Create a Dask 'delayed' task using the new V2 API signature | |
| open_vds_delayed = delayed(open_virtual_dataset) | |
| tasks = [ | |
| open_vds_delayed(url, registry=registry, parser=parser) | |
| for url in s3_urls | |
| ] | |
| # 5. Execute all tasks in parallel | |
| print("Generating individual virtual datasets in parallel...") | |
| separate_datasets = compute(tasks)[0] | |
| print("Individual datasets created.") | |
| # 7. Combine the list of datasets into a single virtual timeseries dataset | |
| print("Combining datasets into a single time series...") | |
| virtual_ds_combined = xr.combine_nested( | |
| separate_datasets, | |
| concat_dim="time", | |
| coords="minimal", | |
| compat="override", | |
| combine_attrs="drop_conflicts", | |
| ) | |
| print("β Combined virtual dataset created:") | |
| print(virtual_ds_combined) | |
| # 8. Save the final reference using the new .vz accessor | |
| print(f"πΎ Saving combined reference to {output_file}...") | |
| virtual_ds_combined.vz.to_kerchunk(output_file, format="parquet") | |
| print(f"π Success! Reference file saved to '{output_file}'.") | |
| client.close() | |
| def main(): | |
| """Parse command-line arguments and runs the script.""" | |
| parser = argparse.ArgumentParser( | |
| description="Build a virtual Parquet reference for an OPERA DISP-S1 frame using VirtualiZarr v2.0.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| parser.add_argument("frame_id", type=int, help="The OPERA frame ID (e.g., 44055).") | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| default="disp_s1_reference_{frame_id:05d}.parq", | |
| help="Output Parquet reference file name (default: %(default)s).", | |
| ) | |
| parser.add_argument( | |
| "--max-files", | |
| type=int, | |
| default=None, | |
| help="Process only the first N files, for testing purposes.", | |
| ) | |
| args = parser.parse_args() | |
| output = ( | |
| args.output.format(frame_id=args.frame_id) | |
| if "{frame_id" in args.output | |
| else args.output | |
| ) | |
| create_virtual_dataset(args.frame_id, output, args.max_files) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import time | |
| from pathlib import Path | |
| import opera_utils.disp.fit | |
| import tyro | |
| import xarray as xr | |
| def create(virt_file: Path, /, outfile: Path | None = None, x_chunk: int = 1024, y_chunk: int = 1024, start_time: str = '2020-01-01'): | |
| t0 = time.time() | |
| if outfile is None: | |
| outfile = virt_file.parent / f"fit_{virt_file.stem}.nc" | |
| print(f"Saving to {outfile}") | |
| ds = xr.open_dataset(str(virt_file), engine='kerchunk') | |
| o = opera_utils.disp.fit.fit_disp_timeseries(ds.sel(time=slice(start_time, None))) | |
| o.to_netcdf(outfile, engine='h5netcdf') | |
| print(f"Finished {virt_file} in {time.time() - t0:.2f} seconds") | |
| if __name__ == "__main__": | |
| tyro.cli(create) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running with
uv:result on 8-CPU EC2 instance