Skip to content

Instantly share code, notes, and snippets.

@bendichter
Last active July 4, 2024 15:52
Show Gist options
  • Save bendichter/30a9afb34b2178098c99f3b01fe72e75 to your computer and use it in GitHub Desktop.
Save bendichter/30a9afb34b2178098c99f3b01fe72e75 to your computer and use it in GitHub Desktop.
Create zarr metadata to read arbitrary binary file
import numpy as np
import json
import base64
def _add_dataset_to_rfs(
rfs: dict,
shape: list[int],
dtype: np.dtype,
dset_name: str,
chunk_len: int = None,
dims: list[str] = None,
units: str = None,
order: str = "C",
description: str = None,
scale_factor: float = None,
add_offset: float = None,
fill_value=None,
) -> None:
"""
Add a dataset to the reference filesystem
Parameters
----------
rfs: dict
Reference file system dictionary
shape : list
The shape of the dataset.
dtype : np.dtype
The data type of the dataset.
dset_name : str
The name of the dataset.
chunk_len : int, optional
The length of the chunks used to divide the dataset. If None, dataset is
unchunked.
dims : list, optional
Labels for the dimensions of the dataset. If None, phony dimension labels
will be generated.
order : {"C", "F"}
Order of values in array. Default is "C".
description : str
scale_factor : float, optional
If dataset is read with mask_and_scale=True, the dataset will be scaled by
this factor. If None, no scaling is applied.
add_offset : float, optional
If dataset is read with mask_and_scale=True, this value will be added to the
scaled dataset. If None, no offset is added.
fill_value : optional
This value denotes a missing value in the dataset. When reading with
mask_and_scale=True, these values will be converted to NaNs. If None, no fill
value is used. Note that this is a different usage from a Zarr or HDF5 fill
value.
"""
if dims is None:
dims = [f"phony_dim_{i}" for i in range(len(shape))]
if chunk_len is None:
chunk_len = shape[0]
dset_shape = list(shape)
chunk_shape = [chunk_len] + dset_shape[1:]
zarray = dict(
chunks=chunk_shape,
compressor=None,
dtype=dtype.str,
fill_value=fill_value,
filters=None,
order=order,
shape=dset_shape,
zarr_format=2,
)
attrs = dict(_ARRAY_DIMENSIONS=dims)
if units is not None:
attrs.update(units=units)
if description is not None:
attrs.update(description=description)
if scale_factor is not None:
attrs.update(scale_factor=scale_factor)
if add_offset is not None:
attrs.update(add_offset=add_offset)
rfs["refs"].update(
{
f"{dset_name}/.zarray": json.dumps(zarray),
f"{dset_name}/.zattrs": json.dumps(attrs),
}
)
def add_internal_dataset_to_rfs(
rfs: dict,
data: np.ndarray,
dset_name: str,
chunk_len: int = None,
dims: list = None,
units: str = None,
order: str = "C",
description: str = None,
scale_factor=None,
add_offset=None,
fill_value=None,
) -> None:
"""
Add a dataset to the reference filesystem inline as base64
Parameters
----------
rfs: dict
Reference file system dictionary
data: np.ndarray
The dataset to be added to the reference file system.
dset_name : str, default="data"
The name of the dataset.
chunk_len : int, optional
The length of the chunks used to divide the dataset. If None, dataset is unchunked
dims : list, optional
Labels for the dimensions of the dataset. If None, phony dimension labels
will be generated.
units : str, optional
order : {"C", "F"}
Order of values in array
description : str
scale_factor : float, optional
If dataset is read with mask_and_scale=True, the dataset will be scaled by
this factor. If None, no scaling is applied.
add_offset : float, optional
If dataset is read with mask_and_scale=True, this value will be added to the
scaled dataset. If None, no offset is added.
fill_value : optional
This value denotes a missing value in the dataset. When reading with
mask_and_scale=True, these values will be converted to NaNs. If None, no fill
value is used. Note that this is a different usage from a Zarr or HDF5 fill
value.
"""
_add_dataset_to_rfs(
rfs=rfs,
shape=list(data.shape),
dtype=data.dtype,
dset_name=dset_name,
chunk_len=chunk_len,
dims=dims,
units=units,
order=order,
description=description,
scale_factor=scale_factor,
add_offset=add_offset,
fill_value=fill_value,
)
base64_encoded = base64.b64encode(data.tobytes())
rfs["refs"][f"{dset_name}/0"] = "base64:" + base64_encoded.decode()
def add_external_dataset_to_rfs(
rfs: dict,
filepath: str,
shape: tuple,
dtype: np.dtype,
offset: int = 0,
dset_name: str = "data",
chunk_len: int = None,
dims: list = None,
units: str = None,
order: str = "C",
description: str = None,
scale_factor: float = None,
add_offset: float = None,
fill_value=None,
) -> None:
"""
Add a dataset to the reference filesystem as an external file
Parameters
----------
rfs: dict
filepath : str
The name of the file where the dataset is stored.
shape : tuple
The shape of the dataset.
dtype : np.dtype
The data type of the dataset.
offset : int, default=0
The initial byte offset in the file where the dataset starts.
dset_name : str, default="data"
The name of the dataset.
chunk_len : int, optional
The length of the chunks used to divide the dataset. If None, dataset is unchunked
dims : list, optional
Labels for the dimensions of the dataset. If None, phony dimension labels
will be generated.
units : str, optional
order : {"C", "F"}
Order of values in array
description : str
scale_factor : float, optional
If dataset is read with mask_and_scale=True, the dataset will be scaled by
this factor. If None, no scaling is applied.
add_offset : float, optional
If dataset is read with mask_and_scale=True, this value will be added to the
scaled dataset. If None, no offset is added.
fill_value : optional
This value denotes a missing value in the dataset. When reading with
mask_and_scale=True, these values will be converted to NaNs. If None, no fill
value is used. Note that this is a different usage from a Zarr or HDF5 fill
value.
"""
_add_dataset_to_rfs(
rfs=rfs,
shape=list(shape),
dtype=dtype,
dset_name=dset_name,
chunk_len=chunk_len,
dims=dims,
units=units,
order=order,
description=description,
scale_factor=scale_factor,
add_offset=add_offset,
fill_value=fill_value,
)
if chunk_len is None:
chunk_len = shape[0]
n_chunks = shape[0] // chunk_len
chunk_shape = [chunk_len] + list(shape[1:])
chunk_size = int(np.prod(chunk_shape) * dtype.itemsize)
for i_chunk in range(n_chunks):
key = f"{dset_name}/{i_chunk}" + ".0" * (len(shape) - 1)
rfs["refs"][key] = [filepath, offset, chunk_size]
offset += chunk_size
def create_rfs() -> dict:
"""
Creates a Zarr metadata dictionary for a dataset, including chunk references.
Returns
-------
dict
A dictionary representing Zarr-like metadata and chunk references.
"""
rfs = {"version": 1, "refs": {".zgroup": json.dumps(dict(zarr_format=2))}}
return rfs
def read_xarray_from_rfs(
rfs: dict, remote_protocol: str = "file", mask_and_scale=False
):
"""
Read an xarray dataset from a reference filesystem.
Parameters
----------
rfs : dict
The reference filesystem containing the dataset.
remote_protocol : str, default="file"
The protocol to use for remote references. Can be "file" or "http".
mask_and_scale : bool, default=False
If True, the dataset will be read with mask_and_scale=True.
"""
ds = xr.open_dataset(
"reference://",
mask_and_scale=mask_and_scale,
engine="zarr",
backend_kwargs={
"storage_options": dict(
fo=rfs,
remote_protocol=remote_protocol,
),
"consolidated": False,
},
)
return ds
# tests
from numpy.testing import assert_array_equal
import h5py
import xarray as xr
# Define the path for the example HDF5 file
hdf5_file_path = "example4.h5"
dset_name = "example_dataset"
# Create some example data
data = np.random.randint(5, size=(100, 100))
# Create the HDF5 file and dataset
with h5py.File(hdf5_file_path, "w") as f:
f.create_dataset("example_dataset", data=data)
rfs = create_rfs()
add_external_dataset_to_rfs(
rfs,
shape=(100, 100),
chunk_len=10,
dtype=data.dtype,
dims=["time", "channels"],
offset=2048,
filepath=hdf5_file_path,
units="uV",
scale_factor=10,
add_offset=100,
)
channel_names = np.array([f"chan{x}" for x in range(100)])
add_internal_dataset_to_rfs(
rfs,
channel_names,
dset_name="channels",
dims=[
"channels",
],
)
rfs2 = create_rfs()
add_external_dataset_to_rfs(
rfs2,
shape=(100, 100),
dtype=data.dtype,
dims=["a", "b"],
offset=2048,
filepath=hdf5_file_path,
)
assert_array_equal(
read_xarray_from_rfs(rfs)["data"].data, read_xarray_from_rfs(rfs2)["data"].data
)
with h5py.File(hdf5_file_path, mode="r") as file:
dset = file["/example_dataset"][:]
assert_array_equal(dset, read_xarray_from_rfs(rfs)["data"].data)
read_xarray_from_rfs(rfs)["data"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment