Created
January 28, 2025 12:12
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Test reading (and writing) Zarr data with host and GPU. | |
$ python gputest.py gpu read | |
gpu read 2.96 | |
$ python gputest.py cpu read | |
cpu read 1.24 | |
""" | |
import argparse | |
import dataclasses | |
import time | |
import cupy as cp | |
import numcodecs | |
import numcodecs.registry | |
from kvikio.nvcomp_codec import ensure_contiguous_ndarray_like | |
import zarr | |
import zarr.core.buffer.gpu | |
import zarr.registry | |
import zarr.storage | |
from zarr.abc.codec import BytesBytesCodec | |
from zarr.core.array_spec import ArraySpec | |
@dataclasses.dataclass(frozen=True) | |
class GPUZstdCodec(BytesBytesCodec): | |
name: str = "zstd" | |
configuration: dict = dataclasses.field(default_factory=dict) | |
is_fixed_size = True | |
level: int = 0 | |
checksum: bool = False | |
def __post_init__(self) -> None: | |
object.__setattr__( | |
self, | |
"_codec", | |
numcodecs.registry.get_codec({"id": "nvcomp_batch", "algorithm": "zstd"}), | |
) | |
async def _encode_single( | |
self, chunk_bytes: zarr.core.buffer.Buffer, chunk_spec: ArraySpec | |
) -> zarr.core.buffer.Buffer: | |
# bufs = [cp.random.uniform(size=100)] | |
bufs = [chunk_bytes.as_array_like()] | |
num_chunks = len(bufs) | |
bufs = [cp.asarray(ensure_contiguous_ndarray_like(b)) for b in bufs] | |
buf_sizes = [b.size * b.itemsize for b in bufs] | |
max_chunk_size = max(buf_sizes) | |
# Get temp and output buffer sizes. | |
temp_size = self._codec._algo.get_compress_temp_size(num_chunks, max_chunk_size) | |
comp_chunk_size = self._codec._algo.get_compress_chunk_size(max_chunk_size) | |
# Prepare data and size buffers. | |
# uncomp_chunks is used as a container that stores pointers to actual chunks. | |
# nvCOMP requires this and sizes buffers to be in GPU memory. | |
uncomp_chunks = cp.array([b.data.ptr for b in bufs], dtype=cp.uintp) | |
uncomp_chunk_sizes = cp.array(buf_sizes, dtype=cp.uint64) | |
temp_buf = cp.empty(temp_size, dtype=cp.uint8) | |
comp_chunks = cp.empty((num_chunks, comp_chunk_size), dtype=cp.uint8) | |
# Array of pointers to each compressed chunk. | |
comp_chunk_ptrs = cp.array([c.data.ptr for c in comp_chunks], dtype=cp.uintp) | |
# Resulting compressed chunk sizes. | |
comp_chunk_sizes = cp.empty(num_chunks, dtype=cp.uint64) | |
self._codec._algo.compress( | |
uncomp_chunks, | |
uncomp_chunk_sizes, | |
max_chunk_size, | |
num_chunks, | |
temp_buf, | |
comp_chunk_ptrs, | |
comp_chunk_sizes, | |
self._codec._stream, | |
) | |
self._codec._stream.synchronize() | |
result = comp_chunks[0] | |
assert isinstance(result, cp.ndarray) | |
return chunk_spec.prototype.buffer.from_array_like(result.view("b")) | |
async def _decode_single( | |
self, chunk_bytes: zarr.core.buffer.Buffer, chunk_spec: ArraySpec | |
) -> zarr.core.buffer.Buffer: | |
return chunk_spec.prototype.buffer.from_array_like( | |
self._codec.decode(chunk_bytes.as_array_like()).view("b") | |
) | |
def parse_args(args=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("device", choices=["gpu", "cpu"]) | |
parser.add_argument("task", choices=["write", "read"]) | |
return parser.parse_args(args) | |
def main(args=None): | |
args = parse_args(args) | |
gpu = args.device == "gpu" | |
write = args.task == "write" | |
if gpu: | |
zarr.registry.register_codec("zstd", GPUZstdCodec) | |
zarr.config.enable_gpu() | |
zarr.config.config["codecs"]["zstd"] = "__main__.GPUZstdCodec" | |
store = zarr.storage.LocalStore("/tmp/data/test-gpu.zarr") | |
xp = cp | |
else: | |
store = zarr.storage.LocalStore("/tmp/data/test-cpu.zarr") | |
import numpy as xp | |
t0 = time.time() | |
if write: | |
z = zarr.create_array( | |
store, | |
name="a", | |
shape=(10_000, 10_000), | |
chunks=(1_000, 1_000), | |
dtype="float32", | |
overwrite=True, | |
) | |
for i in range(10): | |
for j in range(10): | |
src = xp.random.uniform(size=(1000, 1000)) | |
xs = slice(i * 1000, (i + 1) * 1000) | |
ys = slice(j * 1000, (j + 1) * 1000) | |
z[xs, ys] = src | |
else: | |
g = zarr.open_group(store) | |
z = g["a"] | |
arr = z[:] | |
if not gpu: | |
# do the final copy here | |
cp.asarray(z) | |
t1 = time.time() | |
print(f"{args.device} {args.task} {t1 - t0:0.2f}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment