Skip to content

Instantly share code, notes, and snippets.

@ivirshup
Created October 18, 2023 18:14
Show Gist options
  • Save ivirshup/9ba2b570d541ff1393990f632bc7a6ea to your computer and use it in GitHub Desktop.
Save ivirshup/9ba2b570d541ff1393990f632bc7a6ea to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "1735ef1a-53a1-4c0a-967a-d7fa690ad765",
"metadata": {},
"source": [
"# variable chunking kerchunk demo"
]
},
{
"cell_type": "markdown",
"id": "4863ea04-353f-4af4-97ea-3229b73aa2d3",
"metadata": {},
"source": [
"Demo of some things we can do with variable length chunks"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "262def89-314d-4d4c-9d57-0175ba868129",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"import json\n",
"\n",
"import zarr\n",
"import fsspec\n",
"from scipy import sparse\n",
"\n",
"from kerchunk.zarr import single_zarr\n",
"from kerchunk.combine import merge_vars, concatenate_arrays\n",
"\n",
"import anndata as ad, numpy as np, pandas as pd\n",
"from anndata.experimental import write_elem, read_elem, sparse_dataset"
]
},
{
"cell_type": "markdown",
"id": "66ddaab3-b2d4-43fa-807d-642b155da10d",
"metadata": {},
"source": [
"## Concatenate sparse arrays"
]
},
{
"cell_type": "markdown",
"id": "163511e7-468b-464e-bc98-d689d5db1093",
"metadata": {},
"source": [
"Methods for writing CSR sparse arrays with variable length chunks"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "8288fab2-a2e2-455c-80a5-eb37a81638f7",
"metadata": {},
"outputs": [],
"source": [
"def _gen_indptr_chunk_sizes(l: int, max: int) -> list[int]:\n",
" \"\"\"Pads first chunk with an extra point to make up for n+1 total elements\"\"\"\n",
" # TODO: consider whehter first or last chunk should get an extra element\n",
" chunk_sizes = [min(l, max + 1)]\n",
" rem = l - (max + 1)\n",
" while rem > max:\n",
" chunk_sizes.append(max)\n",
" rem -= max\n",
" if rem != 0:\n",
" chunk_sizes.append(rem)\n",
" return chunk_sizes"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "a3c11e9c-99b9-4346-bed4-515474f7a2f6",
"metadata": {},
"outputs": [],
"source": [
"def write_sparse_w_variable_chunks(parent: zarr.Group, key: str, X: sparse.csr_matrix, *, max_elements=10_000):\n",
" group = parent.create_group(key)\n",
" indptr_chunk_sizes = _gen_indptr_chunk_sizes(len(X.indptr), max_elements)\n",
"\n",
" chunk_sizes = []\n",
" indptr_offset = 0\n",
" prev = 0\n",
" for i in indptr_chunk_sizes:\n",
" indptr_offset += i\n",
" curr = X.indptr[indptr_offset - 1]\n",
" chunk_sizes.append(curr - prev)\n",
" prev = curr\n",
"\n",
" group.create_dataset(\"indptr\", data=X.indptr, chunks=(indptr_chunk_sizes,))\n",
" group.create_dataset(\"indices\", data=X.indices, chunks=(chunk_sizes,))\n",
" group.create_dataset(\"data\", data=X.data, chunks=(chunk_sizes,))\n",
"\n",
" group.attrs[\"encoding-type\"] = \"csr_matrix\"\n",
" group.attrs[\"encoding-version\"] = \"0.1.0\"\n",
" group.attrs[\"shape\"] = X.shape"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "05133552-8966-4dcc-b629-f6bc1976ad2e",
"metadata": {},
"outputs": [],
"source": [
"def concatenate_zarr_arrays(arrays: list[zarr.Array]) -> dict:\n",
" references = [single_zarr(array.store.dir_path(array.path)) for array in arrays]\n",
" return concatenate_arrays(references)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "4d2c0fe4-f0fd-4b00-af00-6c456583dc8a",
"metadata": {},
"outputs": [],
"source": [
"def concatenate_zarr_csr_arrays(arrays: list[zarr.Group]) -> dict:\n",
" # Group metadata\n",
" shapes = [array.attrs[\"shape\"] for array in arrays]\n",
" merged_shape = [sum(s[0] for s in shapes), shapes[0][1]]\n",
" metadata = {\"encoding-type\": \"csr_matrix\", \"encoding-version\": \"0.1.0\", \"shape\": merged_shape}\n",
"\n",
" # Concatenating indices & data\n",
" references = [single_zarr(array.store.dir_path(array.path)) for array in arrays]\n",
" data_refs = concatenate_arrays(references, path=\"data\")\n",
" indices_refs = concatenate_arrays(references, path=\"indices\")\n",
"\n",
" # Concatenating indptr in memory\n",
" indptrs = [array[\"indptr\"][:] for array in arrays]\n",
"\n",
" new_indptrs = []\n",
" pos = indptrs[0][-1]\n",
" for indptr in indptrs[1:]:\n",
" new_indptr = indptr[1:]\n",
" new_indptr += pos\n",
" new_indptrs.append(new_indptr)\n",
" pos = new_indptr[-1]\n",
" new_indptr = np.concatenate([indptrs[0]] + new_indptrs)\n",
" new_indptr_zarr = zarr.array(new_indptr)\n",
" \n",
" indptr_refs = {\n",
" \"version\": 1,\n",
" \"refs\": {f\"indptr/{k}\": v for k, v in new_indptr_zarr.store.items()}\n",
" }\n",
"\n",
" # Merging into group\n",
" merged = merge_vars([data_refs, indices_refs, indptr_refs])\n",
" # Setting .zattrs\n",
" merged[\"refs\"][\".zattrs\"] = json.dumps(metadata)\n",
" \n",
" return merged"
]
},
{
"cell_type": "markdown",
"id": "4b0d9947-a154-4749-8595-ccc7c740fe44",
"metadata": {},
"source": [
"## Example:"
]
},
{
"cell_type": "markdown",
"id": "65b96d81-d12c-42a2-9c2e-e0a683220aaf",
"metadata": {},
"source": [
"Make data"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "859772ad-1813-442c-b774-137e655d0c0f",
"metadata": {},
"outputs": [],
"source": [
"TEST_DATA_DIR = Path(\"test_var_chunks\")\n",
"TEST_DATA_DIR.mkdir(exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "de6ceafd-8bba-4d56-92b2-715298e3dabd",
"metadata": {},
"outputs": [],
"source": [
"csrs: list[sparse.csr_matrix] = []\n",
"pths: list[Path] = []\n",
"\n",
"for max_elem in [1000, 2000, 500]:\n",
" csr = sparse.random(10_000, 10_00, format=\"csr\", density=0.1, random_state=np.random.default_rng())\n",
" pth = TEST_DATA_DIR / f\"csr_{max_elem}.zarr\"\n",
" z = zarr.open(pth, \"w\")\n",
" write_sparse_w_variable_chunks(z, \"x\", csr, max_elements=max_elem)\n",
" assert (csr != ad.experimental.read_elem(z[\"x\"])).nnz == 0\n",
" csrs.append(csr)\n",
" pths.append(pth)\n",
"\n",
"csr_groups = [zarr.open(pth)[\"x\"] for pth in pths]\n",
"expected = sparse.vstack(csrs)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "9681b7f6-803a-432a-8291-e9d88b272329",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 83.4 ms, sys: 37 ms, total: 120 ms\n",
"Wall time: 120 ms\n"
]
}
],
"source": [
"%%time\n",
"mapper = fsspec.get_mapper(\"reference://\", fo=concatenate_zarr_csr_arrays(csr_groups))\n",
"result_group = zarr.open(mapper)\n",
"result = read_elem(result_group)"
]
},
{
"cell_type": "markdown",
"id": "a7092ed3-650f-4190-a08b-4de7ae082535",
"metadata": {},
"source": [
"Checking that the result is good:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "7b0e31cf-23b3-4cb2-b542-c83dad7f0968",
"metadata": {},
"outputs": [],
"source": [
"assert (result != expected).nnz == 0"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "77c7e4d9-7532-43a2-919a-83c3c0412c04",
"metadata": {},
"outputs": [],
"source": [
"assert (sparse_dataset(result_group)[900:1500] != expected[900:1500]).nnz == 0"
]
},
{
"cell_type": "markdown",
"id": "bdac0d18-87ec-41e4-bc4f-21077fb4cbbb",
"metadata": {},
"source": [
"## Dask"
]
},
{
"cell_type": "markdown",
"id": "66521f12-c27d-4752-908d-81290e7cdde4",
"metadata": {},
"source": [
"Loading one of the arrays into dask:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "0bdb3b7e-4fb9-46f3-90b9-673df1c04c43",
"metadata": {},
"outputs": [],
"source": [
"import dask.array as da"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "16387b3d-cf16-4988-bc2f-752ed506f01c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" <td>\n",
" <table style=\"border-collapse: collapse;\">\n",
" <thead>\n",
" <tr>\n",
" <td> </td>\n",
" <th> Array </th>\n",
" <th> Chunk </th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" \n",
" <tr>\n",
" <th> Bytes </th>\n",
" <td> 22.89 MiB </td>\n",
" <td> 1.53 MiB </td>\n",
" </tr>\n",
" \n",
" <tr>\n",
" <th> Shape </th>\n",
" <td> (3000000,) </td>\n",
" <td> (200213,) </td>\n",
" </tr>\n",
" <tr>\n",
" <th> Dask graph </th>\n",
" <td colspan=\"2\"> 35 chunks in 2 graph layers </td>\n",
" </tr>\n",
" <tr>\n",
" <th> Data type </th>\n",
" <td colspan=\"2\"> float64 numpy.ndarray </td>\n",
" </tr>\n",
" </tbody>\n",
" </table>\n",
" </td>\n",
" <td>\n",
" <svg width=\"170\" height=\"75\" style=\"stroke:rgb(0,0,0);stroke-width:1\" >\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"0\" y1=\"0\" x2=\"120\" y2=\"0\" style=\"stroke-width:2\" />\n",
" <line x1=\"0\" y1=\"25\" x2=\"120\" y2=\"25\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"0\" y1=\"0\" x2=\"0\" y2=\"25\" style=\"stroke-width:2\" />\n",
" <line x1=\"4\" y1=\"0\" x2=\"4\" y2=\"25\" />\n",
" <line x1=\"12\" y1=\"0\" x2=\"12\" y2=\"25\" />\n",
" <line x1=\"20\" y1=\"0\" x2=\"20\" y2=\"25\" />\n",
" <line x1=\"28\" y1=\"0\" x2=\"28\" y2=\"25\" />\n",
" <line x1=\"35\" y1=\"0\" x2=\"35\" y2=\"25\" />\n",
" <line x1=\"47\" y1=\"0\" x2=\"47\" y2=\"25\" />\n",
" <line x1=\"56\" y1=\"0\" x2=\"56\" y2=\"25\" />\n",
" <line x1=\"71\" y1=\"0\" x2=\"71\" y2=\"25\" />\n",
" <line x1=\"81\" y1=\"0\" x2=\"81\" y2=\"25\" />\n",
" <line x1=\"85\" y1=\"0\" x2=\"85\" y2=\"25\" />\n",
" <line x1=\"89\" y1=\"0\" x2=\"89\" y2=\"25\" />\n",
" <line x1=\"93\" y1=\"0\" x2=\"93\" y2=\"25\" />\n",
" <line x1=\"95\" y1=\"0\" x2=\"95\" y2=\"25\" />\n",
" <line x1=\"99\" y1=\"0\" x2=\"99\" y2=\"25\" />\n",
" <line x1=\"104\" y1=\"0\" x2=\"104\" y2=\"25\" />\n",
" <line x1=\"108\" y1=\"0\" x2=\"108\" y2=\"25\" />\n",
" <line x1=\"111\" y1=\"0\" x2=\"111\" y2=\"25\" />\n",
" <line x1=\"115\" y1=\"0\" x2=\"115\" y2=\"25\" />\n",
" <line x1=\"120\" y1=\"0\" x2=\"120\" y2=\"25\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"0.0,0.0 120.0,0.0 120.0,25.412616514582485 0.0,25.412616514582485\" style=\"fill:#8B4903A0;stroke-width:0\"/>\n",
"\n",
" <!-- Text -->\n",
" <text x=\"60.000000\" y=\"45.412617\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" >3000000</text>\n",
" <text x=\"140.000000\" y=\"12.706308\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" transform=\"rotate(0,140.000000,12.706308)\">1</text>\n",
"</svg>\n",
" </td>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"dask.array<array, shape=(3000000,), dtype=float64, chunksize=(200213,), chunktype=numpy.ndarray>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"((100181,\n",
" 100471,\n",
" 99830,\n",
" 100077,\n",
" 99938,\n",
" 99728,\n",
" 100113,\n",
" 99563,\n",
" 100031,\n",
" 100068,\n",
" 199946,\n",
" 200213,\n",
" 199483,\n",
" 200164,\n",
" 200194,\n",
" 49735,\n",
" 50081,\n",
" 49855,\n",
" 50163,\n",
" 49897,\n",
" 49824,\n",
" 50118,\n",
" 49885,\n",
" 49916,\n",
" 50511,\n",
" 50256,\n",
" 49947,\n",
" 49988,\n",
" 49952,\n",
" 49959,\n",
" 49886,\n",
" 50016,\n",
" 49966,\n",
" 50202,\n",
" 49843),)"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"zarr_array = result_group[\"data\"]\n",
"\n",
"result_data = da.from_array(\n",
" zarr_array,\n",
" chunks=tuple(tuple(c) if isinstance(c, list) else c for c in zarr_array.chunks)\n",
")\n",
"\n",
"display(result_data)\n",
"display(result_data.chunks)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "0072ae0b-d74b-4b81-99a8-f24d3495ad4e",
"metadata": {},
"outputs": [],
"source": [
"np.testing.assert_array_equal(expected.data, result_data.compute())"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:variable-chunks]",
"language": "python",
"name": "conda-env-variable-chunks-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@NikosAlexandris
Copy link

NikosAlexandris commented Oct 30, 2023

Thank you @ivirshup!

Continuing from fsspec/kerchunk#374 (comment) :

So, indptr are offsets into the indices and data arrays. Say ind stands for index what is ptr? Pointer(s)? Also CSR took me a while to get to the definition (compressed sparse row). Maybe in some future documentation it'll be useful to explain all acronyms.

I am trying to adapt your example to my use-case, which, I think, is an easier one. I work with series of netCDF files with varying chunking layouts (by mistake so, as far as I can tell and not out of the producer's intention). Say for example some years are chunked [1, 1300, 1300] and others [1, 2600, 2600] where the spatial resolution is 2600 by 2600. I still need some reading to understand the "critical" part I need to use : is the merge_vars() function, right?

@ivirshup
Copy link
Author

Ah, I misunderstood what you were asking about. This example is very specific to sparse arrays.

I'm actually not sure if our proposal here would make your case possible, unless those netCDFs are uncompressed. Our proposal allows irregular chunking across individual dimensions. So for kerchunk that means if we have arrays like:

image

We can combine those across axis 0 to get something like:

image

But we can't represent the array created by concatenating these along axis 1, e.g.:

image

@martindurant, would you agree with this assessment?

@NikosAlexandris
Copy link

@ivirshup Great visualisation == Clearest explanation! Thank you.

@NikosAlexandris
Copy link

My use case is unequal chunking shapes across data within the same series of products. For example, the chunking shape of :

  • NetCDF 20200101 is [1, 2600, 2600] which can look like [1] [1, 2, 3, 4, .., 2600] [1, 2, 3, 4, .., 2600]
  • NetCDF 20230101 is [1, 1300, 1300] which can look like [1] [1, 2, .., 1300] [1, 2, .., 1300]

These cannot be combined, obviously, with the current implementation. What I thought is possible to do, is to virtually expand as required chunking shapes (in this example, expand the second NetCDF file) so as to let them all match without the need to rechunk all data before-hand. Specifically, let them virtually have the same chunking shape [1, 2600, 2600] in some way like following :

  • NetCDF 20200101 : [1] [1, 2, 3, 4, .., 2600] [1, 2, 3, 4, .., 2600]
  • NetCDF 20230101 : [1] [1, 2, .., 1300, ----] [1, 2, .., 1300, ----]

Hence, the latter two fit and can be combined. ?

@martindurant
Copy link

@martindurant, would you agree with this assessment?

Yes, I agree with the diagrams' conclusion.

Let them virtually have the same chunking shape [1, 2600, 2600] in some way like following :
Hence, the latter two fit and can be combined. ?

This does sound doable, but not as we currently have things designed. There is some previous from https://github.com/d70-t/preffs , which concatenates multiple buffers at the bytes level for each chunk. Again, as in https://github.com/orgs/zarr-developers/discussions/52#discussioncomment-7435293 , this is something that could be done in the kerchunk/referenceFS layer, but we need to be mindful of where the limit of our scope should be.

You can do this currently with dask.array reading from zarr(-like), where you can declare any chunksize over the input zarr(s). I suppose if you construct a dask.array this way and a set of concat/stack calls, you can eventually create a valid xarray.

@NikosAlexandris
Copy link

Thank you @martindurant. Here some argumentation on why such a "feature" would be useful.

Everyone has specific needs and eventually we all do mistakes. Same applies to DWD in producing their SARAH3 products -- my best guess is that : their workflow does not treat the complete time series archive all at once. And for a good reason. Adding a year of data to an archive should not need a complete reproduction of the archive. Nonetheless, deciding for a "good" chunking shape is not an easy decision, it seems. Numerous articles and a few different algorithms to determine an optimal chunking shape (and overall layout) are out there. I have in mind, for example the algorithm by Russ Rew. Likely, at some step of the SARAH3 data production workflow, a suggested chunking shape is used as the best one ? Or something similar between the production of the archive data and then what they call interim or operational data, all part of the same product yet storing observations from the past vs from the near-past/present. This ends up in the mixture of different chunking shapes within the same time series. I wouldn't be surprised if more of this big data archives "suffer" from the same difficulties regarding a good chunking strategy. Rechunking, is probably the right way to go. Yet it is so costly, in time and resources. Having an option to work with such "mixed" chunkings, can serve to perform rather faster some tests with the data.

Well, this is the best argument I can come up with at the moment.

@NikosAlexandris
Copy link

Thank you @martindurant. Here some argumentation on why such a "feature" would be useful.

Everyone has specific needs and eventually we all do mistakes. Same applies to DWD in producing their SARAH3 products -- my best guess is that : their workflow does not treat the complete time series archive all at once. And for a good reason. Adding a year of data to an archive should not need a complete reproduction of the archive. Nonetheless, deciding for a "good" chunking shape is not an easy decision, it seems. Numerous articles and a few different algorithms to determine an optimal chunking shape (and overall layout) are out there. I have in mind, for example the algorithm by Russ Rew. Likely, at some step of the SARAH3 data production workflow, a suggested chunking shape is used as the best one ? Or something similar between the production of the archive data and then what they call interim or operational data, all part of the same product yet storing observations from the past vs from the near-past/present. This ends up in the mixture of different chunking shapes within the same time series. I wouldn't be surprised if more of this big data archives "suffer" from the same difficulties regarding a good chunking strategy. Rechunking, is probably the right way to go. Yet it is so costly, in time and resources. Having an option to work with such "mixed" chunkings, can serve to perform rather faster some tests with the data.

Well, this is the best argument I can come up with at the moment.

Well, it's all "motivated" already in https://github.com/orgs/zarr-developers/discussions/52#discussioncomment-7435293 :-)

@tinaok
Copy link

tinaok commented Nov 17, 2023

Hello,
I'm trying to apply your method to NetCDF files.

!pip install git+https://github.com/ivirshup/kerchunk.git@concat-varchunks


#Create example separated files
import xarray as xr
ds = xr.tutorial.load_dataset('air_temperature')
ds.isel(lon=slice(0,5)).to_netcdf('lon0.nc')
ds.isel(lon=slice(5,10)).to_netcdf('lon1.nc')
ds.isel(lon=slice(10,13)).to_netcdf('lon2.nc')


import glob
dir_url = "/Users/todaka/"
file_pattern = "/data_xios/lon*.nc"
file_paths = glob.glob(dir_url + file_pattern)
file_paths=file_paths[0:2]

import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
def translate_dask(file):
    url = "file://" + file
    with fsspec.open(url) as inf:
        h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=100)
        return h5chunks.translate()
result=[translate_dask(file) for file in file_paths]

from kerchunk.combine import MultiZarrToZarr

mzz = MultiZarrToZarr(
    result,
    concat_dims=["lon"],
)
a = mzz.translate()
xr.open_dataset(a,engine='kerchunk',chunks={})
result = result_indask.compute()

from kerchunk.combine import MultiZarrToZarr
mzz = MultiZarrToZarr(
    result,
    concat_dims=["y"],
)
a = mzz.translate()
xr.open_dataset(a,engine='kerchunk',chunks={})

I tried to apply your function

concatenate_zarr_csr_arrays(result)

But I age err :

AttributeError: 'dict' object has no attribute 'attrs'

What should I do if I want to connect not the zarr files, but NetCDF files using your approach?

Thank you very much for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment