Skip to content

Instantly share code, notes, and snippets.

@NikosAlexandris
Created November 3, 2023 19:33
Show Gist options
  • Save NikosAlexandris/1fbc82fd0578ce96a4d39cbffa4ed584 to your computer and use it in GitHub Desktop.
Save NikosAlexandris/1fbc82fd0578ce96a4d39cbffa4ed584 to your computer and use it in GitHub Desktop.
Determine optimal chunk shape for a 3D array
import math
import operator
import numpy as np
from itertools import product
from functools import reduce
import typer
from pvgisprototype.cli.typer_parameters import OrderCommands
from pydantic import BaseModel, ValidationError, conlist
from pydantic import ConfigDict
from pydantic import field_validator
from typing import Annotated
from typing import List
class VariableShapeModel(BaseModel):
variable_shape: conlist(int, min_length=2, max_length=4)
@field_validator('variable_shape')
@classmethod
def validate_shape(cls, value):
if isinstance(value, str):
try:
return list(map(int, value.split(',')))
except ValueError:
raise ValueError("Invalid integer in the list")
return value
app = typer.Typer(
cls=OrderCommands,
add_completion=True,
add_help_option=True,
rich_markup_mode="rich",
help=f'Determine a good chunk layout',
)
def binlist(n, width=0):
"""Return list of bits that represent a non-negative integer.
n -- non-negative integer
width -- number of bits in returned zero-filled list (default 0)
"""
return [int(bit) for bit in bin(n)[2:].zfill(width)]
def perturb_shape(shape, on_bits):
"""Return shape perturbed by adding 1 to elements corresponding to 1 bits in on_bits
shape -- list of variable dimension sizes
on_bits -- non-negative integer less than 2**len(shape)
"""
return [dim + bit for dim, bit in zip(shape, binlist(on_bits, len(shape)))]
def calculate_ideal_number_of_chunks(variable_shape: List[int], float_size: int, chunk_size: int) -> float:
"""Calculate the ideal number of chunks based on the variable shape and chunk size"""
# ideal_number_of_values = chunk_size / float_size if chunk_size > float_size else 1
ideal_number_of_values = max(chunk_size // float_size, 1)
ideal_number_of_chunks = np.prod(variable_shape) / ideal_number_of_values
return ideal_number_of_chunks
def adjust_first_dimension(variable_shape: List[int], number_of_chunks_per_axis: float) -> float:
"""Adjust the size of the first dimension of the chunk shape"""
if variable_shape[0] / (number_of_chunks_per_axis ** 2) < 1:
return 1.0, number_of_chunks_per_axis / math.sqrt(variable_shape[0] / (number_of_chunks_per_axis ** 2))
return variable_shape[0] // (number_of_chunks_per_axis ** 2), number_of_chunks_per_axis
def determine_chunking_shape(
variable_shape: VariableShapeModel,
float_size: int = 4,
chunk_size: int = 4096,
# dimensions: int = 3,
) -> List[int]:
""" Determine optimal chunk shape for a 3D array.
Based on Python code and algorithm developed by Russ Rew, posted at
"Chunking Data: Choosing Shapes",
https://www.unidata.ucar.edu/blog_content/data/2013/chunk_shape_3D.py
accessed on 31 October 2023
Parameters
----------
variable_shape:
The shape of the 3D array
float_size:
Size of a float value in bytes
chunk_size:
Maximum allowable chunk size in bytes which cannot be greater
than the size of the physical block
dimensions:
Number of dimensions (should be 3 for a 3D array)
Returns
-------
Optimal chunk shape as a list of integers
"""
# if verbose:
# print(f"Variable Shape: {variable_shape.variable_shape}")
# Calculate ideal number of chunks
ideal_number_of_chunks = calculate_ideal_number_of_chunks(
variable_shape, float_size, chunk_size
)
number_of_chunks_per_axis = ideal_number_of_chunks**0.25
# Initialize the first candidate chunking shape
first_dimension, number_of_chunks_per_axis = adjust_first_dimension(
variable_shape, number_of_chunks_per_axis
)
first_candidate_chunking_shape = [first_dimension]
# Factor to increase other dimensions to at least 1 if required
sizing_factor = 1.0
for dimension_size in variable_shape[1:]:
if dimension_size / number_of_chunks_per_axis < 1:
sizing_factor *= number_of_chunks_per_axis / dimension_size
# Adjust other dimensions
for dimension_size in variable_shape[1:]:
chunking_shape = (
1.0
if dimension_size / number_of_chunks_per_axis < 1
else (sizing_factor * dimension_size) // number_of_chunks_per_axis
)
first_candidate_chunking_shape.append(chunking_shape)
# Fine-tuning to find the best chunk shape
best_chunk_size = 0
best_chunking_shape = first_candidate_chunking_shape
for index in range(8): # Total number of dimensions is 3, so 2^3 = 8
# a candidate chunk shape during the fine-tuning process
candidate_chunking_shape = perturb_shape(first_candidate_chunking_shape, index)
number_of_values_in_chunk = np.prod(candidate_chunking_shape)
this_chunk_size = float_size * number_of_values_in_chunk
if best_chunk_size < this_chunk_size <= chunk_size:
best_chunk_size = this_chunk_size # Update best chunk size
best_chunking_shape = list(candidate_chunking_shape) # Update best shape
return list(map(int, best_chunking_shape))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment