Skip to content

Instantly share code, notes, and snippets.

@NikosAlexandris
Created November 6, 2023 20:18
Show Gist options
  • Save NikosAlexandris/f03bb5e33ff727aba28f25a6040ca53a to your computer and use it in GitHub Desktop.
Save NikosAlexandris/f03bb5e33ff727aba28f25a6040ca53a to your computer and use it in GitHub Desktop.
Create a Parquet store for SARAH3 products in form of NetCDF files using Kerchunk
import typer
import hashlib
from pathlib import Path
from functools import partial
import multiprocessing
from fsspec.implementations.reference import LazyReferenceMapper
import kerchunk
import os
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
import ujson
from kerchunk.combine import MultiZarrToZarr
import xarray as xr
from rich import print
VERBOSE_LEVEL_DEFAULT = 0
app = typer.Typer(
add_completion=True,
add_help_option=True,
rich_markup_mode="rich",
help=f'Create kerchunk reference',
)
def generate_file_md5(file_path):
if not file_path.exists():
return None
with open(file_path, 'rb') as f:
file_content = f.read()
if not file_content:
return None
hash_value = hashlib.md5(file_content).hexdigest()
return hash_value
def create_single_reference(file_path, output_directory):
""" """
filename = file_path.stem
output_file = f"{output_directory}/{filename}.json"
hash_file = f"{output_directory}/{filename}.json.hash"
generated_hash = generate_file_md5(file_path)
local_fs = fsspec.filesystem('file')
if local_fs.exists(output_file) and local_fs.exists(hash_file):
print(f'Found a reference file \'{output_file}\' and a hash \'{hash_file}\'')
with local_fs.open(hash_file, 'r') as hf:
existing_hash = hf.read().strip()
if existing_hash == generated_hash:
pass
else:
print(f'Creating reference file \'{output_file}\' with hash \'{generated_hash}\'')
file_url = f"file://{file_path}"
with fsspec.open(file_url, mode='rb') as input_file:
h5chunks = SingleHdf5ToZarr(input_file, file_url, inline_threshold=0)
json = ujson.dumps(h5chunks.translate()).encode()
with local_fs.open(output_file, 'wb') as f:
f.write(json)
with local_fs.open(hash_file, 'w') as hf:
hf.write(generated_hash)
def create_kerchunk_reference(
source_directory: Path,
output_directory: Path,
pattern: str = "*.nc",
workers: int = 4,
dry_run: bool = False,
):
"""Reference local NetCDF files using Kerchunk"""
file_paths = list(source_directory.glob(pattern))
if not file_paths:
print("No files found in the source directory matching the pattern.")
return
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(file_paths)}")
print(f"> Creating single reference files to [code]{output_directory}[/code]")
return # Exit for a dry run
output_directory.mkdir(parents=True, exist_ok=True)
with multiprocessing.Pool(processes=workers) as pool:
print(f'Creating signle references to [code]{output_directory}[/code]')
partial_create_single_reference = partial(
create_single_reference, output_directory=output_directory
)
results = pool.map(partial_create_single_reference, file_paths)
def combine_kerchunk_references_to_parquet(
reference_directory: Path,
pattern: str = "*.json",
combined_reference: Path = "combined_kerchunk.parq",
dry_run: bool = False,
):
"""Combine multiple kerchunked datasets into a single logical parquet aggregate
dataset using fsspec.implementations.reference.ReferenceFileSystem"""
source_directory = Path(reference_directory)
reference_file_paths = list(reference_directory.glob(pattern))
reference_file_paths = list(map(str, reference_file_paths))
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(reference_file_paths)}")
print(f"> Writing combined reference file to [code]{combined_reference}[/code]")
return # Exit for a dry run
# Create LazyReferenceMapper to pass to MultiZarrToZarr
filesystem = fsspec.filesystem("file")
combined_reference.mkdir(parents=True, exist_ok=True)
output_lazy = LazyReferenceMapper.create(1000, str(combined_reference), filesystem)
# Combine single references
print(f'Combining signle references to [code]{combined_reference}[/code]')
mzz = MultiZarrToZarr(
reference_file_paths,
remote_protocol="file",
concat_dims=["time"],
identical_dims=["lat", "lon"],
out=output_lazy,
)
multifile_kerchunk = mzz.translate()
# Write all non-full reference batches
output_lazy.flush()
return multifile_kerchunk
@app.callback()
def main(
source_directory: Path,
reference_directory: Path,
source_pattern: str = '*.nc',
reference_pattern: str = '*.json',
workers: int = 4,
dry_run: bool = False,
combined_reference: Path = "combined_kerchunk.parq",
):
""" """
create_kerchunk_reference(
source_directory=source_directory,
pattern=source_pattern,
output_directory=reference_directory,
workers=workers,
dry_run=dry_run,
)
multifile_kerchunk = combine_kerchunk_references_to_parquet(
reference_directory=reference_directory, # directory with single references
pattern=reference_pattern,
combined_reference=combined_reference,
dry_run=dry_run,
)
kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference))
filesystem = fsspec.implementations.reference.ReferenceFileSystem(
fo=str(combined_reference),
remote_protocol="file",
target_protocol="file",
lazy=True,
)
print(f'Reading from the Parquet storage...')
ds = xr.open_dataset(
filesystem.get_mapper(),
engine="zarr",
backend_kwargs={"consolidated": False},
)
print(ds)
if __name__ == "__main__":
typer.run(main)
import typer
import hashlib
from pathlib import Path
from functools import partial
import multiprocessing
from fsspec.implementations.reference import LazyReferenceMapper
import kerchunk
import os
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
import ujson
from kerchunk.combine import MultiZarrToZarr
import xarray as xr
from rich import print
VERBOSE_LEVEL_DEFAULT = 0
app = typer.Typer(
add_completion=True,
add_help_option=True,
rich_markup_mode="rich",
help=f'Create kerchunk reference',
)
def generate_file_md5(file_path):
if not file_path.exists():
return None
with open(file_path, 'rb') as f:
file_content = f.read()
if not file_content:
return None
hash_value = hashlib.md5(file_content).hexdigest()
return hash_value
def create_single_reference(file_path, output_directory):
""" """
filename = file_path.stem
output_file = f"{output_directory}/{filename}.json"
hash_file = f"{output_directory}/{filename}.json.hash"
generated_hash = generate_file_md5(file_path)
local_fs = fsspec.filesystem('file')
if local_fs.exists(output_file) and local_fs.exists(hash_file):
print(f'Found a reference file \'{output_file}\' and a hash \'{hash_file}\'')
with local_fs.open(hash_file, 'r') as hf:
existing_hash = hf.read().strip()
if existing_hash == generated_hash:
pass
else:
print(f'Creating reference file \'{output_file}\' with hash \'{generated_hash}\'')
file_url = f"file://{file_path}"
with fsspec.open(file_url, mode='rb') as input_file:
h5chunks = SingleHdf5ToZarr(input_file, file_url, inline_threshold=0)
json = ujson.dumps(h5chunks.translate()).encode()
with local_fs.open(output_file, 'wb') as f:
f.write(json)
with local_fs.open(hash_file, 'w') as hf:
hf.write(generated_hash)
def create_kerchunk_reference(
source_directory: Path,
output_directory: Path,
pattern: str = "*.nc",
workers: int = 4,
dry_run: bool = False,
):
"""Reference local NetCDF files using Kerchunk"""
file_paths = list(source_directory.glob(pattern))
if not file_paths:
print("No files found in the source directory matching the pattern.")
return
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(file_paths)}")
print(f"> Creating single reference files to [code]{output_directory}[/code]")
return # Exit for a dry run
output_directory.mkdir(parents=True, exist_ok=True)
with multiprocessing.Pool(processes=workers) as pool:
print(f'Creating signle references to [code]{output_directory}[/code]')
partial_create_single_reference = partial(
create_single_reference, output_directory=output_directory
)
results = pool.map(partial_create_single_reference, file_paths)
def combine_kerchunk_references_to_parquet(
reference_directory: Path,
pattern: str = "*.json",
combined_reference: Path = "combined_kerchunk.parq",
cache_size: int = 1000,
dry_run: bool = False,
):
"""Combine multiple kerchunked datasets into a single logical parquet aggregate
dataset using fsspec.implementations.reference.ReferenceFileSystem"""
source_directory = Path(reference_directory)
reference_file_paths = list(reference_directory.glob(pattern))
reference_file_paths = list(map(str, reference_file_paths))
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(reference_file_paths)}")
print(f"> Writing combined reference file to [code]{combined_reference}[/code]")
return # Exit for a dry run
# Create LazyReferenceMapper to pass to MultiZarrToZarr
filesystem = fsspec.filesystem("file")
combined_reference.mkdir(parents=True, exist_ok=True)
output_lazy = LazyReferenceMapper(
root=str(combined_reference),
fs=filesystem,
cache_size=cache_size,
)
# Combine single references
print(f'Combining signle references to [code]{combined_reference}[/code]')
mzz = MultiZarrToZarr(
reference_file_paths,
remote_protocol="file",
concat_dims=["time"],
identical_dims=["lat", "lon"],
out=output_lazy,
)
multifile_kerchunk = mzz.translate()
# Write all non-full reference batches
output_lazy.flush()
return multifile_kerchunk
@app.callback()
def main(
source_directory: Path,
reference_directory: Path,
source_pattern: str = '*.nc',
reference_pattern: str = '*.json',
workers: int = 4,
combined_reference: Path = "combined_kerchunk.parq",
cache_size: int = 1000,
dry_run: bool = False,
):
""" """
create_kerchunk_reference(
source_directory=source_directory,
pattern=source_pattern,
output_directory=reference_directory,
workers=workers,
dry_run=dry_run,
)
multifile_kerchunk = combine_kerchunk_references_to_parquet(
reference_directory=reference_directory, # directory with single references
pattern=reference_pattern,
combined_reference=combined_reference,
cache_size=cache_size,
dry_run=dry_run,
)
kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference))
filesystem = fsspec.implementations.reference.ReferenceFileSystem(
fo=str(combined_reference),
remote_protocol="file",
target_protocol="file",
lazy=True,
)
print(f'Reading from the Parquet storage...')
ds = xr.open_dataset(
filesystem.get_mapper(),
engine="zarr",
backend_kwargs={"consolidated": False},
)
print(ds)
if __name__ == "__main__":
typer.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment