Skip to content

Instantly share code, notes, and snippets.

@scottstanie
Last active April 17, 2023 19:29
Show Gist options
  • Save scottstanie/664b942a7d2318aab9dee72849adff57 to your computer and use it in GitHub Desktop.
Save scottstanie/664b942a7d2318aab9dee72849adff57 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import datetime
from typing import Optional, Any
import h5netcdf
import h5py
import numpy as np
import pyproj
def make_product(
data: np.ndarray,
geotransform: list[int],
epsg: int,
outname: str,
dts: list[datetime.datetime],
):
assert data.ndim == 3
crs = pyproj.crs.CRS.from_epsg(epsg)
_add_complex_ctype(outname)
with h5netcdf.File(outname, "a", invalid_netcdf=True) as f:
_create_grid_mapping(group=f, crs=crs, gt=geotransform)
_create_tyx_dsets(group=f, gt=geotransform, times=dts, shape=data[0].shape)
_create_geo_dataset_3d(
group=f,
name="slcs",
data=slcs,
description="SLC complex data",
fillvalue=np.nan + 1j * np.nan,
attrs={},
)
def _create_geo_dataset_3d(
*,
group: h5netcdf.Group,
name: str,
data: np.ndarray,
description: str,
fillvalue: float,
attrs: Optional[dict[str, Any]],
) -> h5netcdf.Variable:
dimensions = ["time", "y", "x"]
if attrs is None:
attrs = {}
attrs.update(long_name=description)
options = HDF5_OPTS
if isinstance(data, str):
options = {}
# This is a string, so we need to convert it to bytes or it will fail
data = np.string_(data)
elif np.array(data).size <= 1:
# Scalars don't need chunks/compression
options = {}
else:
options["chunks"] = (len(data), *options["chunks"])
dset = group.create_variable(
name,
dimensions=dimensions,
data=data,
fillvalue=fillvalue,
**options,
)
dset.attrs.update(attrs)
dset.attrs["grid_mapping"] = GRID_MAPPING_DSET
return dset
def _create_grid_mapping(group, crs: pyproj.CRS, gt: list[float]) -> h5netcdf.Variable:
"""Set up the grid mapping variable."""
# https://github.com/corteva/rioxarray/blob/21284f67db536d9c104aa872ab0bbc261259e59e/rioxarray/rioxarray.py#L34
dset = group.create_variable(GRID_MAPPING_DSET, (), data=0, dtype=int)
dset.attrs.update(crs.to_cf())
# Also add the GeoTransform
gt_string = " ".join([str(x) for x in gt])
dset.attrs.update(
dict(
GeoTransform=gt_string,
units="unitless",
long_name=(
"Dummy variable containing geo-referencing metadata in attributes"
),
)
)
return dset
def _add_complex_ctype(h5file: h5py.File):
"""Add the complex64 type to the root of the HDF5 file.
This is required for GDAL to recognize the complex data type.
"""
with h5py.File(h5file, "a") as hf:
if "complex64" in hf["/"]:
return
ctype = h5py.h5t.py_create(np.complex64)
ctype.commit(hf["/"].id, np.string_("complex64"))
def _create_time_array(times: datetime.datetime):
# 'calendar': 'standard',
# 'units': 'seconds since 2017-02-03 00:00:00.000000'
# Create the time array
since_time = times[0]
time = np.array([(t - since_time).total_seconds() for t in times])
calendar = "standard"
units = f"seconds since {since_time.strftime('%Y-%m-%d %H:%M:%S.%f')}"
return time, calendar, units
def _create_yx_arrays(
gt: list[float], shape: tuple[int, int]
) -> tuple[np.ndarray, np.ndarray]:
"""Create the x and y coordinate datasets."""
ysize, xsize = shape
# Parse the geotransform
x_origin, x_res, _, y_origin, _, y_res = gt
# Make the x/y arrays
# Note that these are the center of the pixels, whereas the GeoTransform
# is the upper left corner of the top left pixel.
y = np.arange(y_origin + y_res / 2, y_origin + y_res * ysize, y_res)
x = np.arange(x_origin + x_res / 2, x_origin + x_res * xsize, x_res)
return y, x
def _create_tyx_dsets(
group: h5netcdf.Group,
gt: list[float],
times: list[datetime.datetime],
shape: tuple[int, int],
) -> tuple[h5netcdf.Variable, h5netcdf.Variable]:
"""Create the time, y, and x coordinate datasets."""
y, x = _create_yx_arrays(gt, shape)
times, calendar, units = _create_time_array(times)
if not group.dimensions:
group.dimensions = dict(time=times.size, y=y.size, x=x.size)
# Create the datasets
t_ds = group.create_variable("time", ("time",), data=times, dtype=float)
y_ds = group.create_variable("y", ("y",), data=y, dtype=float)
x_ds = group.create_variable("x", ("x",), data=x, dtype=float)
t_ds.attrs["standard_name"] = "time"
t_ds.attrs["long_name"] = "time"
t_ds.attrs["calendar"] = calendar
t_ds.attrs["units"] = units
for name, ds in zip(["y", "x"], [y_ds, x_ds]):
ds.attrs["standard_name"] = f"projection_{name}_coordinate"
ds.attrs["long_name"] = f"{name.replace('_', ' ')} coordinate of projection"
ds.attrs["units"] = "m"
return t_ds, y_ds, x_ds
import pandas as pd
import xarray as xr
from dolphin.utils import get_dates
from dolphin.workflows.config import OPERA_DATASET_NAME
def prep(ds):
fname = ds.encoding["source"]
date = get_dates(fname)[0]
return ds.expand_dims(time=[pd.to_datetime(date)])
def get_cslc_dask(file_list, group=OPERA_DATASET_NAME.strip("VV"), xy_chunks=256):
chunks = {"time": len(file_list), "y_coordinates": xy_chunks, "x_coordinates": xy_chunks}
da = xr.open_mfdataset(
file_list, engine="h5netcdf", group=group, preprocess=prep, chunks=chunks
)
# Need to rechunk because it will open with chunksize 1 per file
return da.chunk(chunks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment