Skip to content

Instantly share code, notes, and snippets.

@NikosAlexandris
Created November 8, 2023 14:29
Show Gist options
  • Save NikosAlexandris/afcefadf0f4cc0846acb9337dcf7ad55 to your computer and use it in GitHub Desktop.
Save NikosAlexandris/afcefadf0f4cc0846acb9337dcf7ad55 to your computer and use it in GitHub Desktop.
Create a Parquet store for SARAH3 products in form of NetCDF files using Kerchunk (after https://github.com/fsspec/kerchunk/pull/391)
from pathlib import Path
from functools import partial
import typer
from typing import Optional
import xarray as xr
from rich import print
import fsspec
from fsspec.implementations.reference import LazyReferenceMapper
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import multiprocessing
import json
app = typer.Typer(
no_args_is_help=True,
add_completion=True,
add_help_option=True,
rich_markup_mode="rich",
help=f"Create parquet references",
)
def create_parquet_references(
input_file: Path,
output_parquet_store: Path,
record_size: int = 1000,
):
""" """
output_parquet_store.mkdir(parents=True, exist_ok=True)
filesystem = fsspec.filesystem("file")
try:
output = LazyReferenceMapper.create(
root=str(output_parquet_store), # does not handle Path
fs=filesystem,
record_size=record_size,
)
single_zarr = SingleHdf5ToZarr(input_file, out=output)
single_zarr.translate()
output.flush()
except json.JSONDecodeError as e:
print(f"JSONDecodeError: {e}")
print(f"Failed processing file: {input_file}")
return
return output_parquet_store
def create_single_parquet_references(
input_file_path,
output_directory,
record_size,
):
""" """
filename = input_file_path.stem
single_parquet_store = output_directory / f"{filename}.parquet"
create_parquet_references(
input_file_path,
output_parquet_store=single_parquet_store,
record_size=record_size,
)
def create_multi_parquet_references(
source_directory: Path,
output_parquet_store: Path,
output_directory: Path,
pattern: str = "*.nc",
record_size: int = 1000,
workers: int = 4,
):
""" """
input_file_paths = list(source_directory.glob(pattern))
if not input_file_paths:
print("No files found in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]!"
)
return
output_directory.mkdir(parents=True, exist_ok=True)
with multiprocessing.Pool(processes=workers) as pool:
print(f'Creating Parquet stores in [code]{output_directory}[/code]')
partial_create_parquet_references = partial(
create_single_parquet_references,
output_directory=output_directory,
record_size=record_size,
)
pool.map(partial_create_parquet_references, input_file_paths)
print(f'Done creating single Parquet stores!')
output_parquet_store = output_parquet_store.parent / (output_parquet_store.name + '.parquet')
output_parquet_store.mkdir(parents=True, exist_ok=True)
filesystem = fsspec.filesystem("file")
try:
output = LazyReferenceMapper.create(
root=str(output_parquet_store),
fs=filesystem,
record_size=10
)
reference_pattern = '*.parquet'
input_references = list(output_directory.glob(reference_pattern))
input_references = list(map(str, input_references))
multi_zarr = MultiZarrToZarr(
input_references,
remote_protocol="memory",
concat_dims=["time"],
out=output,
)
multi_zarr.translate()
output.flush()
except Exception as e:
print(f"Failed creating the [code]{output_parquet_store}[/code] : {e}!")
return
@app.command(
"reference",
no_args_is_help=True,
help=f"Create Parquet references to an HDF5/NetCDF file",
)
def reference(
input_file: Path,
output_directory: Optional[Path] = '.',
record_size: int = 1000,
dry_run: bool = False,
):
filename = input_file.stem
output_parquet_store = output_directory / f'{filename}.parquet'
"""Create Parquet references from an HDF5/NetCDF file"""
if dry_run:
print(f"[bold]Dry running operations that would be performed[/bold]:")
print(
f"> Creating Parquet references to [code]{input_file}[/code] in [code]{output_parquet_store}[/code]"
)
return # Exit for a dry run
create_parquet_references(
input_file=input_file,
output_parquet_store=output_parquet_store,
record_size=record_size,
)
@app.command(
"reference-multi",
no_args_is_help=True,
help=f"Create Parquet references to multiple HDF5/NetCDF files",
)
def reference_multi(
source_directory: Path,
output_parquet_store: Path,
output_directory: Optional[Path] = '.',
pattern: str = "*.nc",
record_size: int = 1000,
workers: int = 4,
dry_run: bool = False,
):
"""Create Parquet references from an HDF5/NetCDF file"""
input_file_paths = list(source_directory.glob(pattern))
if not input_file_paths:
print("No files found in the source directory matching the pattern.")
return
if dry_run:
print(
f"[bold]Dry running operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched : {len(input_file_paths)}")
print(f"> Creating Parquet stores in [code]{output_directory}[/code]")
return # Exit for a dry run
create_multi_parquet_references(
source_directory=source_directory,
output_parquet_store=output_parquet_store,
output_directory=output_directory,
pattern=pattern,
record_size=record_size,
workers=workers,
)
@app.command(
no_args_is_help=True,
help=f"Select data from a Parquet references store",
)
def select(
parquet_store: Path,
):
"""Select data from a Parquet store"""
dataset = xr.open_dataset(
str(parquet_store), # does not handle Path
engine="kerchunk",
storage_options=dict(skip_instance_cache=True, remote_protocol="file"),
# backend_kwargs={"consolidated": False},
)
print(dataset)
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment