Created
November 3, 2023 19:33
-
-
Save NikosAlexandris/1fbc82fd0578ce96a4d39cbffa4ed584 to your computer and use it in GitHub Desktop.
Determine optimal chunk shape for a 3D array
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import operator | |
import numpy as np | |
from itertools import product | |
from functools import reduce | |
import typer | |
from pvgisprototype.cli.typer_parameters import OrderCommands | |
from pydantic import BaseModel, ValidationError, conlist | |
from pydantic import ConfigDict | |
from pydantic import field_validator | |
from typing import Annotated | |
from typing import List | |
class VariableShapeModel(BaseModel): | |
variable_shape: conlist(int, min_length=2, max_length=4) | |
@field_validator('variable_shape') | |
@classmethod | |
def validate_shape(cls, value): | |
if isinstance(value, str): | |
try: | |
return list(map(int, value.split(','))) | |
except ValueError: | |
raise ValueError("Invalid integer in the list") | |
return value | |
app = typer.Typer( | |
cls=OrderCommands, | |
add_completion=True, | |
add_help_option=True, | |
rich_markup_mode="rich", | |
help=f'Determine a good chunk layout', | |
) | |
def binlist(n, width=0): | |
"""Return list of bits that represent a non-negative integer. | |
n -- non-negative integer | |
width -- number of bits in returned zero-filled list (default 0) | |
""" | |
return [int(bit) for bit in bin(n)[2:].zfill(width)] | |
def perturb_shape(shape, on_bits): | |
"""Return shape perturbed by adding 1 to elements corresponding to 1 bits in on_bits | |
shape -- list of variable dimension sizes | |
on_bits -- non-negative integer less than 2**len(shape) | |
""" | |
return [dim + bit for dim, bit in zip(shape, binlist(on_bits, len(shape)))] | |
def calculate_ideal_number_of_chunks(variable_shape: List[int], float_size: int, chunk_size: int) -> float: | |
"""Calculate the ideal number of chunks based on the variable shape and chunk size""" | |
# ideal_number_of_values = chunk_size / float_size if chunk_size > float_size else 1 | |
ideal_number_of_values = max(chunk_size // float_size, 1) | |
ideal_number_of_chunks = np.prod(variable_shape) / ideal_number_of_values | |
return ideal_number_of_chunks | |
def adjust_first_dimension(variable_shape: List[int], number_of_chunks_per_axis: float) -> float: | |
"""Adjust the size of the first dimension of the chunk shape""" | |
if variable_shape[0] / (number_of_chunks_per_axis ** 2) < 1: | |
return 1.0, number_of_chunks_per_axis / math.sqrt(variable_shape[0] / (number_of_chunks_per_axis ** 2)) | |
return variable_shape[0] // (number_of_chunks_per_axis ** 2), number_of_chunks_per_axis | |
def determine_chunking_shape( | |
variable_shape: VariableShapeModel, | |
float_size: int = 4, | |
chunk_size: int = 4096, | |
# dimensions: int = 3, | |
) -> List[int]: | |
""" Determine optimal chunk shape for a 3D array. | |
Based on Python code and algorithm developed by Russ Rew, posted at | |
"Chunking Data: Choosing Shapes", | |
https://www.unidata.ucar.edu/blog_content/data/2013/chunk_shape_3D.py | |
accessed on 31 October 2023 | |
Parameters | |
---------- | |
variable_shape: | |
The shape of the 3D array | |
float_size: | |
Size of a float value in bytes | |
chunk_size: | |
Maximum allowable chunk size in bytes which cannot be greater | |
than the size of the physical block | |
dimensions: | |
Number of dimensions (should be 3 for a 3D array) | |
Returns | |
------- | |
Optimal chunk shape as a list of integers | |
""" | |
# if verbose: | |
# print(f"Variable Shape: {variable_shape.variable_shape}") | |
# Calculate ideal number of chunks | |
ideal_number_of_chunks = calculate_ideal_number_of_chunks( | |
variable_shape, float_size, chunk_size | |
) | |
number_of_chunks_per_axis = ideal_number_of_chunks**0.25 | |
# Initialize the first candidate chunking shape | |
first_dimension, number_of_chunks_per_axis = adjust_first_dimension( | |
variable_shape, number_of_chunks_per_axis | |
) | |
first_candidate_chunking_shape = [first_dimension] | |
# Factor to increase other dimensions to at least 1 if required | |
sizing_factor = 1.0 | |
for dimension_size in variable_shape[1:]: | |
if dimension_size / number_of_chunks_per_axis < 1: | |
sizing_factor *= number_of_chunks_per_axis / dimension_size | |
# Adjust other dimensions | |
for dimension_size in variable_shape[1:]: | |
chunking_shape = ( | |
1.0 | |
if dimension_size / number_of_chunks_per_axis < 1 | |
else (sizing_factor * dimension_size) // number_of_chunks_per_axis | |
) | |
first_candidate_chunking_shape.append(chunking_shape) | |
# Fine-tuning to find the best chunk shape | |
best_chunk_size = 0 | |
best_chunking_shape = first_candidate_chunking_shape | |
for index in range(8): # Total number of dimensions is 3, so 2^3 = 8 | |
# a candidate chunk shape during the fine-tuning process | |
candidate_chunking_shape = perturb_shape(first_candidate_chunking_shape, index) | |
number_of_values_in_chunk = np.prod(candidate_chunking_shape) | |
this_chunk_size = float_size * number_of_values_in_chunk | |
if best_chunk_size < this_chunk_size <= chunk_size: | |
best_chunk_size = this_chunk_size # Update best chunk size | |
best_chunking_shape = list(candidate_chunking_shape) # Update best shape | |
return list(map(int, best_chunking_shape)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment