Skip to content

Instantly share code, notes, and snippets.

@scottstanie
Last active September 8, 2025 16:35
Show Gist options
  • Select an option

  • Save scottstanie/c5a7ade5ef2083716d91a90231d6dfbc to your computer and use it in GitHub Desktop.

Select an option

Save scottstanie/c5a7ade5ef2083716d91a90231d6dfbc to your computer and use it in GitHub Desktop.
Create a reference parquet file for one OPERA DISP-S1 frame
#!/usr/bin/env python
import argparse
import multiprocessing
import warnings
import xarray as xr
from dask import compute, delayed
from dask.distributed import Client
from obstore.store import S3Store
from virtualizarr import open_virtual_dataset
from virtualizarr.parsers import HDFParser
from virtualizarr.registry import ObjectStoreRegistry
from opera_utils.credentials import AWSCredentials
from opera_utils.disp import DispProductStack, search
# ignore the specific UserWarning from numcodecs
warnings.filterwarnings(
"ignore", message="Numcodecs codecs are not in the Zarr version 3 specification.*"
)
def create_virtual_dataset(
frame_id: int, output_file: str, max_files: int | None = None
):
"""Build a virtual Parquet reference for an OPERA DISP-S1 frame.
This function performs the following steps:
1. Searches for all DISP-S1 products for a given frame ID.
2. Obtains temporary S3 credentials for in-region access.
3. Creates an S3Store, HDFParser, and ObjectStoreRegistry as required by V2.
4. Creates individual virtual datasets for each file in parallel using Dask.
5. Combines the individual datasets into a single xarray.Dataset.
6. Saves the final combined reference to a Parquet file using the new .vz accessor.
Parameters
----------
frame_id : int
The OPERA frame ID to process (e.g., 44055).
output_file : str
The path for the output Parquet reference file.
max_files : int, optional
Maximum number of files to process for a quick test. Processes all by default.
"""
print(f"πŸ›°οΈ Searching for OPERA DISP-S1 data for frame ID: {frame_id}")
# 1. Search for data and get S3 URLs using opera_utils
dps = DispProductStack(search(frame_id=frame_id, url_type="s3"))
if not dps:
print(f"❌ No products found for frame ID {frame_id}.")
return
s3_urls = dps.filenames
dates = dps.secondary_dates
if max_files:
print(f"Limiting to the first {max_files} files for this run.")
s3_urls = s3_urls[:max_files]
dates = dates[:max_files]
print(f"βœ… Found {len(s3_urls)} products.")
# 2. VIRTUALIZARR V2 SETUP: Create explicit store and parser objects.
# First, get temporary AWS credentials.
print("πŸ”‘ Obtaining temporary AWS credentials...")
creds = AWSCredentials.from_asf()
# Next, define the S3 bucket and create the S3Store object.
bucket = "asf-cumulus-prod-opera-products"
store = S3Store(
bucket=bucket,
access_key_id=creds.access_key_id,
secret_access_key=creds.secret_access_key,
session_token=creds.session_token,
region="us-west-2",
)
# Create the registry to map the S3 prefix to the store object.
registry = ObjectStoreRegistry({f"s3://{bucket}": store})
# Create the HDFParser, passing loadable_variables to it directly.
parser = HDFParser()
print("βœ… VirtualiZarr v2.0 components (Store, Registry, Parser) are configured.")
# 3. Set up parallel processing with a local Dask cluster
n_workers = max(min(32, multiprocessing.cpu_count() - 1), 1)
client = Client(n_workers=n_workers, threads_per_worker=1)
print(
f"πŸš€ Dask client started with {n_workers} workers. Dashboard: {client.dashboard_link}"
)
# 4. Create a Dask 'delayed' task using the new V2 API signature
open_vds_delayed = delayed(open_virtual_dataset)
tasks = [
open_vds_delayed(url, registry=registry, parser=parser)
for url in s3_urls
]
# 5. Execute all tasks in parallel
print("Generating individual virtual datasets in parallel...")
separate_datasets = compute(tasks)[0]
print("Individual datasets created.")
# 7. Combine the list of datasets into a single virtual timeseries dataset
print("Combining datasets into a single time series...")
virtual_ds_combined = xr.combine_nested(
separate_datasets,
concat_dim="time",
coords="minimal",
compat="override",
combine_attrs="drop_conflicts",
)
print("βœ… Combined virtual dataset created:")
print(virtual_ds_combined)
# 8. Save the final reference using the new .vz accessor
print(f"πŸ’Ύ Saving combined reference to {output_file}...")
virtual_ds_combined.vz.to_kerchunk(output_file, format="parquet")
print(f"πŸŽ‰ Success! Reference file saved to '{output_file}'.")
client.close()
def main():
"""Parse command-line arguments and runs the script."""
parser = argparse.ArgumentParser(
description="Build a virtual Parquet reference for an OPERA DISP-S1 frame using VirtualiZarr v2.0.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("frame_id", type=int, help="The OPERA frame ID (e.g., 44055).")
parser.add_argument(
"-o",
"--output",
default="disp_s1_reference_{frame_id:05d}.parq",
help="Output Parquet reference file name (default: %(default)s).",
)
parser.add_argument(
"--max-files",
type=int,
default=None,
help="Process only the first N files, for testing purposes.",
)
args = parser.parse_args()
output = (
args.output.format(frame_id=args.frame_id)
if "{frame_id" in args.output
else args.output
)
create_virtual_dataset(args.frame_id, output, args.max_files)
if __name__ == "__main__":
main()
#!/usr/bin/env python
import time
from pathlib import Path
import opera_utils.disp.fit
import tyro
import xarray as xr
def create(virt_file: Path, /, outfile: Path | None = None, x_chunk: int = 1024, y_chunk: int = 1024, start_time: str = '2020-01-01'):
t0 = time.time()
if outfile is None:
outfile = virt_file.parent / f"fit_{virt_file.stem}.nc"
print(f"Saving to {outfile}")
ds = xr.open_dataset(str(virt_file), engine='kerchunk')
o = opera_utils.disp.fit.fit_disp_timeseries(ds.sel(time=slice(start_time, None)))
o.to_netcdf(outfile, engine='h5netcdf')
print(f"Finished {virt_file} in {time.time() - t0:.2f} seconds")
if __name__ == "__main__":
tyro.cli(create)
@scottstanie
Copy link
Author

scottstanie commented Aug 12, 2025

Running with uv:

uv venv -p 3.12 --managed-python
uv pip install ipython "dask[distributed]" xarray "virtualizarr[netcdf3,hdf]>=2" fastparquet h5netcdf "opera-utils[disp]" fsspec s3fs h5py
# Create one for frame ID 44055 over New Orleans
uv run python build_virtual_reference.py 44055

result on 8-CPU EC2 instance

βœ… Combined virtual dataset created:
<xarray.Dataset> Size: 260GB
Dimensions:                         (time: 209, y: 7721, x: 9462)
Coordinates:
  * y                               (y) float64 62kB 3.487e+06 ... 3.256e+06
  * x                               (x) float64 76kB 7.268e+04 ... 3.565e+05
  * time                            (time) datetime64[ns] 2kB 2016-07-26T00:0...
Data variables:
    spatial_ref                     (time) int64 2kB ManifestArray<shape=(209...
    reference_time                  (time) float64 2kB ManifestArray<shape=(2...
    displacement                    (time, y, x) float32 61GB ManifestArray<s...
    recommended_mask                (time, y, x) uint8 15GB ManifestArray<sha...
    temporal_coherence              (time, y, x) float32 61GB ManifestArray<s...
    phase_similarity                (time, y, x) float32 61GB ManifestArray<s...
    timeseries_inversion_residuals  (time, y, x) float32 61GB ManifestArray<s...
Attributes:
    Conventions:         CF-1.8
    contact:             opera-sds-ops@jpl.nasa.gov
    institution:         NASA JPL
    mission_name:        OPERA
    reference_document:  JPL D-108765
    title:               OPERA_L3_DISP-S1 Product
πŸ’Ύ Saving combined reference to disp_s1_reference_44055.parq...
πŸŽ‰ Success! Reference file saved to 'disp_s1_reference_44055.parq'.

real	8m31.032s
user	4m34.756s
sys	6m25.815s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment